diff --git a/api/client.go b/api/client.go new file mode 100644 index 0000000..de21dc5 --- /dev/null +++ b/api/client.go @@ -0,0 +1,89 @@ +package api + +import ( + "sync" + + "github.com/eosswedenorg/thalos/api/message" +) + +type handler func([]byte) + +// Client reads and decodes messages from a reader and provides callback functions. +type Client struct { + reader Reader + decoder message.Decoder + + wg sync.WaitGroup + + OnError func(error) + OnAction func(message.ActionTrace) + OnHeartbeat func(message.HeartBeat) +} + +func NewClient(reader Reader, decoder message.Decoder) *Client { + return &Client{ + reader: reader, + decoder: decoder, + } +} + +func (c *Client) worker(channel Channel, h handler) { + for { + payload, err := c.reader.Read(channel) + if err != nil { + if c.OnError != nil { + c.OnError(err) + } + return + } + + h(payload) + } +} + +func (c *Client) actHandler(payload []byte) { + var act message.ActionTrace + if err := c.decoder(payload, &act); err != nil { + if c.OnError != nil { + c.OnError(err) + } + return + } + c.OnAction(act) +} + +func (c *Client) hbHandler(payload []byte) { + var hb message.HeartBeat + if err := c.decoder(payload, &hb); err != nil { + if c.OnError != nil { + c.OnError(err) + } + return + } + c.OnHeartbeat(hb) +} + +func (c *Client) Subscribe(channel Channel) { + var handler handler + + if HeartbeatChannel.Is(channel) { + handler = c.hbHandler + } else { + handler = c.actHandler + } + + // Start a worker for this channel. + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.worker(channel, handler) + }() +} + +func (c *Client) Run() { + c.wg.Wait() +} + +func (c *Client) Close() error { + return c.reader.Close() +}