diff --git a/api/client.go b/api/client.go index a52e6c5..e76b2cc 100644 --- a/api/client.go +++ b/api/client.go @@ -17,9 +17,10 @@ type Client struct { // waitgroup for worker threads. wg sync.WaitGroup - OnError func(error) - OnAction func(message.ActionTrace) - OnHeartbeat func(message.HeartBeat) + OnError func(error) + OnAction func(message.ActionTrace) + OnHeartbeat func(message.HeartBeat) + OnTableDelta func(message.TableDelta) } func NewClient(reader Reader, decoder message.Decoder) *Client { @@ -55,6 +56,18 @@ func (c *Client) actHandler(payload []byte) { c.OnAction(act) } +// TableDelta handler +func (c *Client) tableDeltaHandler(payload []byte) { + td := message.TableDelta{} + if err := c.decoder(payload, &td); err != nil { + if c.OnError != nil { + c.OnError(err) + } + return + } + c.OnTableDelta(td) +} + // HeartBeat handler func (c *Client) hbHandler(payload []byte) { var hb message.HeartBeat @@ -75,6 +88,8 @@ func (c *Client) Subscribe(channel Channel) error { handler = c.hbHandler case ActionChannel{}.Channel().Type(): handler = c.actHandler + case TableDeltaChannel{}.Channel().Type(): + handler = c.tableDeltaHandler default: return fmt.Errorf("invalid channel type. %s", t) }