mirror of
https://github.com/eosswedenorg/thalos
synced 2026-06-16 04:24:56 +02:00
160 lines
3.1 KiB
Go
160 lines
3.1 KiB
Go
package api
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/eosswedenorg/thalos/api/message"
|
|
)
|
|
|
|
type handler func([]byte)
|
|
|
|
// Client reads and decodes messages from a reader and posts them to a go channel
|
|
type Client struct {
|
|
reader Reader
|
|
decoder message.Decoder
|
|
|
|
// waitgroup for worker threads.
|
|
wg sync.WaitGroup
|
|
|
|
// Channel for messages and errors
|
|
channel chan any
|
|
}
|
|
|
|
func NewClient(reader Reader, decoder message.Decoder) *Client {
|
|
return &Client{
|
|
reader: reader,
|
|
decoder: decoder,
|
|
channel: make(chan any),
|
|
}
|
|
}
|
|
|
|
func (c *Client) Channel() <-chan any {
|
|
return c.channel
|
|
}
|
|
|
|
// Helper method to post a message to a channel with timeout.
|
|
func (c *Client) post(msg any) {
|
|
select {
|
|
case <-time.After(time.Second):
|
|
case c.channel <- msg:
|
|
}
|
|
}
|
|
|
|
func (c *Client) worker(channel Channel, h handler) {
|
|
for {
|
|
payload, err := c.reader.Read(channel)
|
|
if err != nil {
|
|
// Don't report EOF as an error because it is used
|
|
// by readers to signal an graceful end of input.
|
|
if err != io.EOF {
|
|
c.post(err)
|
|
}
|
|
return
|
|
}
|
|
|
|
h(payload)
|
|
}
|
|
}
|
|
|
|
// Helper method to decode a message and post and error on the channel if it fails.
|
|
// Returns true if successful. False otherwise
|
|
func (c *Client) decode(payload []byte, msg any) bool {
|
|
if err := c.decoder(payload, msg); err != nil {
|
|
c.post(err)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Rollback handler
|
|
func (c *Client) rollbackHandler(payload []byte) {
|
|
var rb message.RollbackMessage
|
|
if ok := c.decode(payload, &rb); ok {
|
|
c.post(rb)
|
|
}
|
|
}
|
|
|
|
// Transaction handler
|
|
func (c *Client) transactionHandler(payload []byte) {
|
|
var trans message.TransactionTrace
|
|
if ok := c.decode(payload, &trans); ok {
|
|
c.post(trans)
|
|
}
|
|
}
|
|
|
|
// Action handler
|
|
func (c *Client) actHandler(payload []byte) {
|
|
var act message.ActionTrace
|
|
if ok := c.decode(payload, &act); ok {
|
|
c.post(act)
|
|
}
|
|
}
|
|
|
|
// TableDelta handler
|
|
func (c *Client) tableDeltaHandler(payload []byte) {
|
|
td := message.TableDelta{}
|
|
if ok := c.decode(payload, &td); ok {
|
|
c.post(td)
|
|
}
|
|
}
|
|
|
|
// HeartBeat handler
|
|
func (c *Client) hbHandler(payload []byte) {
|
|
var hb message.HeartBeat
|
|
if ok := c.decode(payload, &hb); ok {
|
|
c.post(hb)
|
|
}
|
|
}
|
|
|
|
func (c *Client) sub(channel Channel) error {
|
|
var h handler
|
|
|
|
switch channel.Type() {
|
|
case RollbackChannel.Type():
|
|
h = c.rollbackHandler
|
|
case TransactionChannel.Type():
|
|
h = c.transactionHandler
|
|
case HeartbeatChannel.Type():
|
|
h = c.hbHandler
|
|
case ActionChannel{}.Channel().Type():
|
|
h = c.actHandler
|
|
case TableDeltaChannel{}.Channel().Type():
|
|
h = c.tableDeltaHandler
|
|
default:
|
|
return fmt.Errorf("invalid channel type. %s", channel.Type())
|
|
}
|
|
|
|
// Start a worker for this channel.
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
c.worker(channel, h)
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) Subscribe(channels ...Channel) error {
|
|
for _, ch := range channels {
|
|
if err := c.sub(ch); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) Run() {
|
|
// Just wait for workers to complete.
|
|
c.wg.Wait()
|
|
}
|
|
|
|
func (c *Client) Close() error {
|
|
err := c.reader.Close()
|
|
// Wait for all goroutines to finish before closing channel.
|
|
c.wg.Wait()
|
|
close(c.channel)
|
|
return err
|
|
}
|