From c876875a6ed888f5ba0be53f1c231b174e8dd0e7 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Sun, 12 May 2024 17:14:00 +0200 Subject: [PATCH] internal/server/message_queue.go: adding MessageQueue struct This struct will handle the routing to channels aswell as encoding of the messages. --- internal/server/message_queue.go | 74 ++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 internal/server/message_queue.go diff --git a/internal/server/message_queue.go b/internal/server/message_queue.go new file mode 100644 index 0000000..50386ec --- /dev/null +++ b/internal/server/message_queue.go @@ -0,0 +1,74 @@ +package server + +import ( + "errors" + + "github.com/eosswedenorg/thalos/api" + "github.com/eosswedenorg/thalos/api/message" + "github.com/eosswedenorg/thalos/internal/driver" +) + +// MessageQueue takes care of message routing and encoding +type MessageQueue struct { + // Writer to write messages to + writer driver.Writer + + // encoder to encode messages with + encode message.Encoder +} + +func NewMessageQueue(writer driver.Writer, encoder message.Encoder) MessageQueue { + return MessageQueue{ + writer: writer, + encode: encoder, + } +} + +func (mq MessageQueue) PostHeartbeat(hb message.HeartBeat) error { + return mq.post(hb, api.HeartbeatChannel) +} + +func (mq MessageQueue) PostRollback(rb message.RollbackMessage) error { + return mq.post(rb, api.RollbackChannel) +} + +func (mq MessageQueue) PostTransactionTrace(trace message.TransactionTrace) error { + return mq.post(trace, api.TransactionChannel) +} + +// Post a ActionTrace message to the queue +func (mq MessageQueue) PostAction(act message.ActionTrace) error { + return mq.post(act, + api.ActionChannel{}.Channel(), + api.ActionChannel{Name: act.Name}.Channel(), + api.ActionChannel{Contract: act.Contract}.Channel(), + api.ActionChannel{Name: act.Name, Contract: act.Contract}.Channel(), + ) +} + +func (mq MessageQueue) PostTableDelta(delta message.TableDelta) error { + return mq.post(delta, + api.TableDeltaChannel{}.Channel(), + api.TableDeltaChannel{Name: delta.Name}.Channel(), + ) +} + +func (mq MessageQueue) Flush() error { + return mq.writer.Flush() +} + +func (mq MessageQueue) Close() error { + return mq.writer.Close() +} + +func (mq MessageQueue) post(v interface{}, channels ...api.Channel) error { + payload, err := mq.encode(v) + if err == nil { + for _, channel := range channels { + if w_err := mq.writer.Write(channel, payload); err != nil { + err = errors.Join(w_err) + } + } + } + return err +}