1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-07-02 11:43:40 +02:00

api/client.go: Rework to use a channel instead of callback.

This commit is contained in:
Henrik Hautakoski 2024-02-04 21:55:33 +01:00
parent a4954ab949
commit 133af980a3
4 changed files with 114 additions and 148 deletions

View file

@ -2,15 +2,15 @@ package api
import ( import (
"fmt" "fmt"
"reflect"
"sync" "sync"
"time"
"github.com/eosswedenorg/thalos/api/message" "github.com/eosswedenorg/thalos/api/message"
) )
type handler func([]byte) type handler func([]byte)
// Client reads and decodes messages from a reader and provides callback functions. // Client reads and decodes messages from a reader and posts thems to a go channel
type Client struct { type Client struct {
reader Reader reader Reader
decoder message.Decoder decoder message.Decoder
@ -18,18 +18,26 @@ type Client struct {
// waitgroup for worker threads. // waitgroup for worker threads.
wg sync.WaitGroup wg sync.WaitGroup
OnError func(error) // Channel for messages and errors
OnRollback func(message.RollbackMessage) channel chan any
OnTransaction func(message.TransactionTrace)
OnAction func(message.ActionTrace)
OnHeartbeat func(message.HeartBeat)
OnTableDelta func(message.TableDelta)
} }
func NewClient(reader Reader, decoder message.Decoder) *Client { func NewClient(reader Reader, decoder message.Decoder) *Client {
return &Client{ return &Client{
reader: reader, reader: reader,
decoder: decoder, decoder: decoder,
channel: make(chan any),
}
}
func (c *Client) Channel() <-chan any {
return c.channel
}
func (c *Client) post(msg any) {
select {
case <-time.After(time.Second):
case c.channel <- msg:
} }
} }
@ -37,9 +45,7 @@ func (c *Client) worker(channel Channel, h handler) {
for { for {
payload, err := c.reader.Read(channel) payload, err := c.reader.Read(channel)
if err != nil { if err != nil {
if c.OnError != nil { c.post(err)
c.OnError(err)
}
return return
} }
@ -47,13 +53,11 @@ func (c *Client) worker(channel Channel, h handler) {
} }
} }
// Helper method to decode a message and call OnError on error. // Helper method to decode a message and post and error on the channel if it fails.
// Returns true if successfull. false otherwise // Returns true if successfull. false otherwise
func (c *Client) decode(payload []byte, msg any) bool { func (c *Client) decode(payload []byte, msg any) bool {
if err := c.decoder(payload, msg); err != nil { if err := c.decoder(payload, msg); err != nil {
if c.OnError != nil { c.post(err)
c.OnError(err)
}
return false return false
} }
return true return true
@ -63,7 +67,7 @@ func (c *Client) decode(payload []byte, msg any) bool {
func (c *Client) rollbackHandler(payload []byte) { func (c *Client) rollbackHandler(payload []byte) {
var rb message.RollbackMessage var rb message.RollbackMessage
if ok := c.decode(payload, &rb); ok { if ok := c.decode(payload, &rb); ok {
c.OnRollback(rb) c.post(rb)
} }
} }
@ -71,7 +75,7 @@ func (c *Client) rollbackHandler(payload []byte) {
func (c *Client) transactionHandler(payload []byte) { func (c *Client) transactionHandler(payload []byte) {
var trans message.TransactionTrace var trans message.TransactionTrace
if ok := c.decode(payload, &trans); ok { if ok := c.decode(payload, &trans); ok {
c.OnTransaction(trans) c.post(trans)
} }
} }
@ -79,7 +83,7 @@ func (c *Client) transactionHandler(payload []byte) {
func (c *Client) actHandler(payload []byte) { func (c *Client) actHandler(payload []byte) {
var act message.ActionTrace var act message.ActionTrace
if ok := c.decode(payload, &act); ok { if ok := c.decode(payload, &act); ok {
c.OnAction(act) c.post(act)
} }
} }
@ -87,7 +91,7 @@ func (c *Client) actHandler(payload []byte) {
func (c *Client) tableDeltaHandler(payload []byte) { func (c *Client) tableDeltaHandler(payload []byte) {
td := message.TableDelta{} td := message.TableDelta{}
if ok := c.decode(payload, &td); ok { if ok := c.decode(payload, &td); ok {
c.OnTableDelta(td) c.post(td)
} }
} }
@ -95,37 +99,33 @@ func (c *Client) tableDeltaHandler(payload []byte) {
func (c *Client) hbHandler(payload []byte) { func (c *Client) hbHandler(payload []byte) {
var hb message.HeartBeat var hb message.HeartBeat
if ok := c.decode(payload, &hb); ok { if ok := c.decode(payload, &hb); ok {
c.OnHeartbeat(hb) c.post(hb)
} }
} }
func (c *Client) Subscribe(channel Channel) error { func (c *Client) Subscribe(channel Channel) error {
handlers := map[string]struct { var handler handler
handler handler
callback any
}{
RollbackChannel.Type(): {c.rollbackHandler, c.OnRollback},
TransactionChannel.Type(): {c.transactionHandler, c.OnTransaction},
HeartbeatChannel.Type(): {c.hbHandler, c.OnHeartbeat},
ActionChannel{}.Channel().Type(): {c.actHandler, c.OnAction},
TableDeltaChannel{}.Channel().Type(): {c.tableDeltaHandler, c.OnTableDelta},
}
h, ok := handlers[channel.Type()] switch channel.Type() {
case RollbackChannel.Type():
if !ok { handler = c.rollbackHandler
case TransactionChannel.Type():
handler = c.transactionHandler
case HeartbeatChannel.Type():
handler = c.hbHandler
case ActionChannel{}.Channel().Type():
handler = c.actHandler
case TableDeltaChannel{}.Channel().Type():
handler = c.tableDeltaHandler
default:
return fmt.Errorf("invalid channel type. %s", channel.Type()) return fmt.Errorf("invalid channel type. %s", channel.Type())
} }
if h.callback == nil || reflect.ValueOf(h.callback).IsNil() {
return fmt.Errorf("please set an handler before calling Subscribe")
}
// Start a worker for this channel. // Start a worker for this channel.
c.wg.Add(1) c.wg.Add(1)
go func() { go func() {
defer c.wg.Done() defer c.wg.Done()
c.worker(channel, h.handler) c.worker(channel, handler)
}() }()
return nil return nil
@ -137,5 +137,9 @@ func (c *Client) Run() {
} }
func (c *Client) Close() error { func (c *Client) Close() error {
return c.reader.Close() err := c.reader.Close()
// Wait for all goroutines before closing channel.
c.wg.Wait()
close(c.channel)
return err
} }

View file

@ -33,12 +33,6 @@ func mockDecoder([]byte, any) error {
return nil return nil
} }
func mockHbHandler(message.HeartBeat) {
}
func mockActionHandler(message.ActionTrace) {
}
func TestClient_Subscribe(t *testing.T) { func TestClient_Subscribe(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
@ -48,13 +42,12 @@ func TestClient_Subscribe(t *testing.T) {
{"Channel", Channel{}, true}, {"Channel", Channel{}, true},
{"ActionChannel", ActionChannel{}.Channel(), false}, {"ActionChannel", ActionChannel{}.Channel(), false},
{"HeartbeatChannel", HeartbeatChannel, false}, {"HeartbeatChannel", HeartbeatChannel, false},
{"TransactionChannel", TransactionChannel, true}, {"TransactionChannel", TransactionChannel, false},
{"InvalidChannel", Channel{"random_type"}, true},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
c := NewClient(&mockReader{}, mockDecoder) c := NewClient(&mockReader{}, mockDecoder)
c.OnHeartbeat = mockHbHandler
c.OnAction = mockActionHandler
if err := c.Subscribe(tt.channel); (err != nil) != tt.wantErr { if err := c.Subscribe(tt.channel); (err != nil) != tt.wantErr {
t.Errorf("Client.Subscribe() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("Client.Subscribe() error = %v, wantErr %v", err, tt.wantErr)
} }
@ -62,18 +55,7 @@ func TestClient_Subscribe(t *testing.T) {
} }
} }
func TestClient_SubscribeWithNilHandler(t *testing.T) {
client := NewClient(nil, nil)
client.OnAction = mockActionHandler
client.OnHeartbeat = mockHbHandler
err := client.Subscribe(TableDeltaChannel{Name: "name"}.Channel())
assert.Error(t, err)
}
func TestClient_ReadRollback(t *testing.T) { func TestClient_ReadRollback(t *testing.T) {
called := false
expected := message.RollbackMessage{ expected := message.RollbackMessage{
OldBlockNum: 1000, OldBlockNum: 1000,
NewBlockNum: 50, NewBlockNum: 50,
@ -86,15 +68,10 @@ func TestClient_ReadRollback(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
client := NewClient(mockReader{bytes.NewReader(payload)}, codec.Decoder) client := NewClient(mockReader{bytes.NewReader(payload)}, codec.Decoder)
client.OnRollback = func(rb message.RollbackMessage) {
assert.Equal(t, rb, expected)
called = true
}
err = client.Subscribe(RollbackChannel) err = client.Subscribe(RollbackChannel)
assert.NoError(t, err) assert.NoError(t, err)
client.Run() actual := <-client.Channel()
assert.Equal(t, expected, actual)
assert.True(t, called, "Rollback callback not called when it should have been")
} }

View file

@ -76,46 +76,47 @@ var benchCmd = &cli.Command{
client := api.NewClient(sub, codec.Decoder) client := api.NewClient(sub, codec.Decoder)
client.OnAction = func(act message.ActionTrace) {
counter++
}
// Subscribe to all actions // Subscribe to all actions
if err = client.Subscribe(api.ActionChannel{}.Channel()); err != nil { if err = client.Subscribe(api.ActionChannel{}.Channel()); err != nil {
return err return err
} }
go func() { go func() {
t := time.Now() for t := range client.Channel() {
sig := make(chan os.Signal, 1) switch err := t.(type) {
signal.Notify(sig, os.Interrupt) case message.ActionTrace:
counter++
for { case error:
select { log.WithError(err).Error("Error when reading stream")
case <-sig:
fmt.Println("Got interrupt")
client.Close()
return
case now := <-time.After(interval):
elapsed := now.Sub(t)
t = now
log.WithFields(log.Fields{
"num_messages": counter,
"elapsed": elapsed,
"msg_per_sec": float64(counter) / elapsed.Seconds(),
"msg_per_ms": float64(counter) / float64(elapsed.Milliseconds()),
"msg_per_min": float64(counter) / elapsed.Minutes(),
}).Info("Benchmark results")
counter = 0
} }
} }
}() }()
// Read stuff. t := time.Now()
client.Run() sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
return nil // Read stuff.
for {
select {
case <-sig:
fmt.Println("Got interrupt")
client.Close()
return nil
case now := <-time.After(interval):
elapsed := now.Sub(t)
t = now
log.WithFields(log.Fields{
"num_messages": counter,
"elapsed": elapsed,
"msg_per_sec": float64(counter) / elapsed.Seconds(),
"msg_per_ms": float64(counter) / float64(elapsed.Milliseconds()),
"msg_per_min": float64(counter) / elapsed.Minutes(),
}).Info("Benchmark results")
counter = 0
}
}
}, },
} }

View file

@ -18,37 +18,6 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
type Tester struct {
block_num uint32
timeout time.Duration
timer *time.Ticker
}
func NewTester(timeout time.Duration) *Tester {
return &Tester{
block_num: 0,
timeout: timeout,
timer: time.NewTicker(timeout),
}
}
func (t *Tester) OnAction(act message.ActionTrace) {
if t.block_num > 0 {
var diff int32 = int32(act.BlockNum - t.block_num)
if diff < 0 || diff > 1 {
log.WithFields(log.Fields{
"current_block": t.block_num,
"block": act.BlockNum,
"diff": diff,
}).Warn("Invalid")
}
}
t.block_num = act.BlockNum
t.timer.Reset(t.timeout)
}
var validateCmd = &cli.Command{ var validateCmd = &cli.Command{
Name: "validate", Name: "validate",
Usage: "Validate a thalos server by following action traces and makes sure that blocks arrive in order.", Usage: "Validate a thalos server by following action traces and makes sure that blocks arrive in order.",
@ -59,7 +28,6 @@ var validateCmd = &cli.Command{
chainIdFlag, chainIdFlag,
}, },
Action: func(ctx *cli.Context) error { Action: func(ctx *cli.Context) error {
tester := NewTester(time.Second * 5)
status_duration := time.Second * 10 status_duration := time.Second * 10
log.WithFields(log.Fields{ log.WithFields(log.Fields{
@ -94,37 +62,53 @@ var validateCmd = &cli.Command{
} }
client := api.NewClient(sub, codec.Decoder) client := api.NewClient(sub, codec.Decoder)
client.OnAction = tester.OnAction
// Subscribe to all actions // Subscribe to all actions
if err = client.Subscribe(api.ActionChannel{}.Channel()); err != nil { if err = client.Subscribe(api.ActionChannel{}.Channel()); err != nil {
return err return err
} }
go func() { block_num := uint32(0)
sig := make(chan os.Signal, 1) timeout := time.Second * 5
signal.Notify(sig, os.Interrupt) timer := time.NewTicker(timeout)
for { go func() {
select { for t := range client.Channel() {
case <-sig: switch msg := t.(type) {
fmt.Println("Got interrupt") case message.ActionTrace:
client.Close() if block_num > 0 {
return var diff int32 = int32(msg.BlockNum - block_num)
case <-tester.timer.C: if diff < 0 || diff > 1 {
log.WithField("duration", tester.timeout). log.WithFields(log.Fields{
Warn("Did not get any messages during the defined duration") "current_block": block_num,
case <-time.After(status_duration): "block": msg.BlockNum,
log.WithFields(log.Fields{ "diff": diff,
"current_block": tester.block_num, }).Warn("Invalid")
}).Info("Status") }
}
block_num = msg.BlockNum
timer.Reset(timeout)
} }
} }
}() }()
// Read stuff. sig := make(chan os.Signal, 1)
client.Run() signal.Notify(sig, os.Interrupt)
return nil for {
select {
case <-sig:
fmt.Println("Got interrupt")
client.Close()
return nil
case <-timer.C:
log.WithField("duration", timeout).
Warn("Did not get any messages during the defined duration")
case <-time.After(status_duration):
log.WithFields(log.Fields{
"current_block": block_num,
}).Info("Status")
}
}
}, },
} }