1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-16 04:24:56 +02:00
thalos/internal/server/message_queue.go
Henrik Hautakoski c876875a6e internal/server/message_queue.go: adding MessageQueue struct
This struct will handle the routing to channels aswell as encoding of
the messages.
2024-05-12 17:14:00 +02:00

74 lines
1.8 KiB
Go

package server
import (
"errors"
"github.com/eosswedenorg/thalos/api"
"github.com/eosswedenorg/thalos/api/message"
"github.com/eosswedenorg/thalos/internal/driver"
)
// MessageQueue takes care of message routing and encoding
type MessageQueue struct {
// Writer to write messages to
writer driver.Writer
// encoder to encode messages with
encode message.Encoder
}
func NewMessageQueue(writer driver.Writer, encoder message.Encoder) MessageQueue {
return MessageQueue{
writer: writer,
encode: encoder,
}
}
func (mq MessageQueue) PostHeartbeat(hb message.HeartBeat) error {
return mq.post(hb, api.HeartbeatChannel)
}
func (mq MessageQueue) PostRollback(rb message.RollbackMessage) error {
return mq.post(rb, api.RollbackChannel)
}
func (mq MessageQueue) PostTransactionTrace(trace message.TransactionTrace) error {
return mq.post(trace, api.TransactionChannel)
}
// Post a ActionTrace message to the queue
func (mq MessageQueue) PostAction(act message.ActionTrace) error {
return mq.post(act,
api.ActionChannel{}.Channel(),
api.ActionChannel{Name: act.Name}.Channel(),
api.ActionChannel{Contract: act.Contract}.Channel(),
api.ActionChannel{Name: act.Name, Contract: act.Contract}.Channel(),
)
}
func (mq MessageQueue) PostTableDelta(delta message.TableDelta) error {
return mq.post(delta,
api.TableDeltaChannel{}.Channel(),
api.TableDeltaChannel{Name: delta.Name}.Channel(),
)
}
func (mq MessageQueue) Flush() error {
return mq.writer.Flush()
}
func (mq MessageQueue) Close() error {
return mq.writer.Close()
}
func (mq MessageQueue) post(v interface{}, channels ...api.Channel) error {
payload, err := mq.encode(v)
if err == nil {
for _, channel := range channels {
if w_err := mq.writer.Write(channel, payload); err != nil {
err = errors.Join(w_err)
}
}
}
return err
}