mirror of
https://github.com/eosswedenorg/thalos
synced 2026-06-17 04:30:03 +02:00
internal/server/message_queue.go: adding MessageQueue struct
This struct will handle the routing to channels aswell as encoding of the messages.
This commit is contained in:
parent
871f2ae04d
commit
c876875a6e
1 changed files with 74 additions and 0 deletions
74
internal/server/message_queue.go
Normal file
74
internal/server/message_queue.go
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/eosswedenorg/thalos/api"
|
||||
"github.com/eosswedenorg/thalos/api/message"
|
||||
"github.com/eosswedenorg/thalos/internal/driver"
|
||||
)
|
||||
|
||||
// MessageQueue takes care of message routing and encoding
|
||||
type MessageQueue struct {
|
||||
// Writer to write messages to
|
||||
writer driver.Writer
|
||||
|
||||
// encoder to encode messages with
|
||||
encode message.Encoder
|
||||
}
|
||||
|
||||
func NewMessageQueue(writer driver.Writer, encoder message.Encoder) MessageQueue {
|
||||
return MessageQueue{
|
||||
writer: writer,
|
||||
encode: encoder,
|
||||
}
|
||||
}
|
||||
|
||||
func (mq MessageQueue) PostHeartbeat(hb message.HeartBeat) error {
|
||||
return mq.post(hb, api.HeartbeatChannel)
|
||||
}
|
||||
|
||||
func (mq MessageQueue) PostRollback(rb message.RollbackMessage) error {
|
||||
return mq.post(rb, api.RollbackChannel)
|
||||
}
|
||||
|
||||
func (mq MessageQueue) PostTransactionTrace(trace message.TransactionTrace) error {
|
||||
return mq.post(trace, api.TransactionChannel)
|
||||
}
|
||||
|
||||
// Post a ActionTrace message to the queue
|
||||
func (mq MessageQueue) PostAction(act message.ActionTrace) error {
|
||||
return mq.post(act,
|
||||
api.ActionChannel{}.Channel(),
|
||||
api.ActionChannel{Name: act.Name}.Channel(),
|
||||
api.ActionChannel{Contract: act.Contract}.Channel(),
|
||||
api.ActionChannel{Name: act.Name, Contract: act.Contract}.Channel(),
|
||||
)
|
||||
}
|
||||
|
||||
func (mq MessageQueue) PostTableDelta(delta message.TableDelta) error {
|
||||
return mq.post(delta,
|
||||
api.TableDeltaChannel{}.Channel(),
|
||||
api.TableDeltaChannel{Name: delta.Name}.Channel(),
|
||||
)
|
||||
}
|
||||
|
||||
func (mq MessageQueue) Flush() error {
|
||||
return mq.writer.Flush()
|
||||
}
|
||||
|
||||
func (mq MessageQueue) Close() error {
|
||||
return mq.writer.Close()
|
||||
}
|
||||
|
||||
func (mq MessageQueue) post(v interface{}, channels ...api.Channel) error {
|
||||
payload, err := mq.encode(v)
|
||||
if err == nil {
|
||||
for _, channel := range channels {
|
||||
if w_err := mq.writer.Write(channel, payload); err != nil {
|
||||
err = errors.Join(w_err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue