1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-16 04:24:56 +02:00
thalos/transport/client.go
2023-03-08 15:52:10 +01:00

77 lines
1.5 KiB
Go

package transport
import (
"encoding/json"
"eosio-ship-trace-reader/transport/message"
)
type Client struct {
reader Reader
decoder message.Decoder
actChan chan message.ActionTrace
hbChan chan message.HearthBeat
errChan chan error
}
func NewClient(reader Reader) *Client {
return &Client{
reader: reader,
decoder: json.Unmarshal,
actChan: make(chan message.ActionTrace, 16),
hbChan: make(chan message.HearthBeat, 16),
errChan: make(chan error, 16),
}
}
func actWorker(decoder message.Decoder, out chan<- message.ActionTrace, reader Reader, channel Channel) {
for {
payload, err := reader.Read(channel)
if err != nil {
return
}
var act message.ActionTrace
if err := decoder(payload, &act); err != nil {
continue
}
out <- act
}
}
func hbWorker(decoder message.Decoder, out chan<- message.HearthBeat, reader Reader, channel Channel) {
for {
payload, err := reader.Read(channel)
if err != nil {
return
}
var hb message.HearthBeat
if err := decoder(payload, &hb); err != nil {
continue
}
out <- hb
}
}
func (c Client) Subscribe(channel Channel) {
if HeartbeatChannel.Is(channel) {
go hbWorker(c.decoder, c.hbChan, c.reader, channel)
}
go actWorker(c.decoder, c.actChan, c.reader, channel)
}
func (c Client) ActionTrace() <-chan message.ActionTrace {
return c.actChan
}
func (c Client) Heartbeat() <-chan message.HearthBeat {
return c.hbChan
}
func (c Client) Close() {
close(c.actChan)
close(c.hbChan)
}