mirror of
https://github.com/eosswedenorg/thalos
synced 2026-06-16 04:24:56 +02:00
api/client.go: in Subscribe() handle nil handler correctly.
This commit is contained in:
parent
bcd4a93c46
commit
35a9706954
1 changed files with 18 additions and 11 deletions
|
|
@ -2,6 +2,7 @@ package api
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
"github.com/eosswedenorg/thalos/api/message"
|
||||
|
|
@ -81,24 +82,30 @@ func (c *Client) hbHandler(payload []byte) {
|
|||
}
|
||||
|
||||
func (c *Client) Subscribe(channel Channel) error {
|
||||
var handler handler
|
||||
handlers := map[string]struct {
|
||||
handler handler
|
||||
callback any
|
||||
}{
|
||||
HeartbeatChannel.Type(): {c.hbHandler, c.OnHeartbeat},
|
||||
ActionChannel{}.Channel().Type(): {c.actHandler, c.OnAction},
|
||||
TableDeltaChannel{}.Channel().Type(): {c.tableDeltaHandler, c.OnTableDelta},
|
||||
}
|
||||
|
||||
switch t := channel.Type(); t {
|
||||
case HeartbeatChannel.Type():
|
||||
handler = c.hbHandler
|
||||
case ActionChannel{}.Channel().Type():
|
||||
handler = c.actHandler
|
||||
case TableDeltaChannel{}.Channel().Type():
|
||||
handler = c.tableDeltaHandler
|
||||
default:
|
||||
return fmt.Errorf("invalid channel type. %s", t)
|
||||
h, ok := handlers[channel.Type()]
|
||||
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid channel type. %s", channel.Type())
|
||||
}
|
||||
|
||||
if h.callback == nil || reflect.ValueOf(h.callback).IsNil() {
|
||||
return fmt.Errorf("please set an handler before calling Subscribe")
|
||||
}
|
||||
|
||||
// Start a worker for this channel.
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
c.worker(channel, handler)
|
||||
c.worker(channel, h.handler)
|
||||
}()
|
||||
|
||||
return nil
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue