mirror of
https://github.com/eosswedenorg/thalos
synced 2026-06-16 04:24:56 +02:00
74 lines
1.8 KiB
Go
74 lines
1.8 KiB
Go
package server
|
|
|
|
import (
|
|
"errors"
|
|
|
|
"github.com/eosswedenorg/thalos/api"
|
|
"github.com/eosswedenorg/thalos/api/message"
|
|
"github.com/eosswedenorg/thalos/internal/driver"
|
|
)
|
|
|
|
// MessageQueue takes care of message routing and encoding
|
|
type MessageQueue struct {
|
|
// Writer to write messages to
|
|
writer driver.Writer
|
|
|
|
// Encoder to encode messages with
|
|
encode message.Encoder
|
|
}
|
|
|
|
func NewMessageQueue(writer driver.Writer, encoder message.Encoder) MessageQueue {
|
|
return MessageQueue{
|
|
writer: writer,
|
|
encode: encoder,
|
|
}
|
|
}
|
|
|
|
func (mq MessageQueue) PostHeartbeat(hb message.HeartBeat) error {
|
|
return mq.post(hb, api.HeartbeatChannel)
|
|
}
|
|
|
|
func (mq MessageQueue) PostRollback(rb message.RollbackMessage) error {
|
|
return mq.post(rb, api.RollbackChannel)
|
|
}
|
|
|
|
func (mq MessageQueue) PostTransactionTrace(trace message.TransactionTrace) error {
|
|
return mq.post(trace, api.TransactionChannel)
|
|
}
|
|
|
|
// Post a ActionTrace message to the queue
|
|
func (mq MessageQueue) PostAction(act message.ActionTrace) error {
|
|
return mq.post(act,
|
|
api.ActionChannel{}.Channel(),
|
|
api.ActionChannel{Name: act.Name}.Channel(),
|
|
api.ActionChannel{Contract: act.Contract}.Channel(),
|
|
api.ActionChannel{Name: act.Name, Contract: act.Contract}.Channel(),
|
|
)
|
|
}
|
|
|
|
func (mq MessageQueue) PostTableDelta(delta message.TableDelta) error {
|
|
return mq.post(delta,
|
|
api.TableDeltaChannel{}.Channel(),
|
|
api.TableDeltaChannel{Name: delta.Name}.Channel(),
|
|
)
|
|
}
|
|
|
|
func (mq MessageQueue) Flush() error {
|
|
return mq.writer.Flush()
|
|
}
|
|
|
|
func (mq MessageQueue) Close() error {
|
|
return mq.writer.Close()
|
|
}
|
|
|
|
func (mq MessageQueue) post(v interface{}, channels ...api.Channel) error {
|
|
payload, err := mq.encode(v)
|
|
if err == nil {
|
|
for _, channel := range channels {
|
|
if w_err := mq.writer.Write(channel, payload); err != nil {
|
|
err = errors.Join(w_err)
|
|
}
|
|
}
|
|
}
|
|
return err
|
|
}
|