mirror of
https://github.com/eosswedenorg/thalos
synced 2026-06-19 04:50:02 +02:00
api/client.go: implement Client.Subscribe correctly by returning a error on unsupported channel type.
This commit is contained in:
parent
9e586d879d
commit
8154bee1c2
2 changed files with 35 additions and 3 deletions
|
|
@ -1,6 +1,7 @@
|
||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/eosswedenorg/thalos/api/message"
|
"github.com/eosswedenorg/thalos/api/message"
|
||||||
|
|
@ -63,13 +64,16 @@ func (c *Client) hbHandler(payload []byte) {
|
||||||
c.OnHeartbeat(hb)
|
c.OnHeartbeat(hb)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Subscribe(channel Channel) {
|
func (c *Client) Subscribe(channel Channel) error {
|
||||||
var handler handler
|
var handler handler
|
||||||
|
|
||||||
if HeartbeatChannel.Is(channel) {
|
switch t := channel.Type(); t {
|
||||||
|
case HeartbeatChannel.Type():
|
||||||
handler = c.hbHandler
|
handler = c.hbHandler
|
||||||
} else {
|
case ActionChannel{}.Channel().Type():
|
||||||
handler = c.actHandler
|
handler = c.actHandler
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("invalid channel type. %s", t)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a worker for this channel.
|
// Start a worker for this channel.
|
||||||
|
|
@ -78,6 +82,8 @@ func (c *Client) Subscribe(channel Channel) {
|
||||||
defer c.wg.Done()
|
defer c.wg.Done()
|
||||||
c.worker(channel, handler)
|
c.worker(channel, handler)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Run() {
|
func (c *Client) Run() {
|
||||||
|
|
|
||||||
26
api/client_test.go
Normal file
26
api/client_test.go
Normal file
|
|
@ -0,0 +1,26 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestClient_Subscribe(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
channel Channel
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{"Channel", Channel{}, true},
|
||||||
|
{"ActionChannel", ActionChannel{}.Channel(), false},
|
||||||
|
{"HeartbeatChannel", HeartbeatChannel, false},
|
||||||
|
{"TransactionChannel", TransactionChannel, true},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
c := Client{}
|
||||||
|
if err := c.Subscribe(tt.channel); (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("Client.Subscribe() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue