1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-16 04:24:56 +02:00

internal/server/ship_processor.go: refactor to use MessageQueue struct.

This commit is contained in:
Henrik Hautakoski 2024-05-12 17:15:03 +02:00
parent c876875a6e
commit 95365d0c26

View file

@ -4,7 +4,6 @@ import (
"bytes"
"encoding/hex"
"github.com/eosswedenorg/thalos/api"
"github.com/eosswedenorg/thalos/api/message"
"github.com/eosswedenorg/thalos/internal/abi"
"github.com/eosswedenorg/thalos/internal/driver"
@ -16,19 +15,6 @@ import (
"github.com/pnx/antelope-go/ship"
)
// logDecoratedEncoder decorates a message.Encoder and logs any error.
func logDecoratedEncoder(encoder message.Encoder) message.Encoder {
return func(v interface{}) ([]byte, error) {
payload, err := encoder(v)
if err != nil {
log.WithError(err).
WithField("v", v).
Warn("Failed to encode message")
}
return payload, err
}
}
// A ShipProcessor will consume messages from a ship stream, convert the messages into
// thalos specific ones, encode them and finally post them to an api.Writer
type ShipProcessor struct {
@ -38,11 +24,7 @@ type ShipProcessor struct {
// Abi manager used for cacheing
abi *abi.AbiManager
// Writer to send messages to.
writer driver.Writer
// Encoder used to encode messages
encode message.Encoder
queue MessageQueue
// Function for saving state.
saver StateSaver
@ -62,10 +44,9 @@ func SpawnProccessor(shipStream *shipclient.Stream, loader StateLoader, saver St
processor := &ShipProcessor{
saver: saver,
abi: abi,
writer: writer,
shipStream: shipStream,
encode: logDecoratedEncoder(codec.Encoder),
syscontract: chain.N("eosio"),
queue: NewMessageQueue(writer, codec.Encoder),
}
loader(&processor.state)
@ -85,22 +66,6 @@ func (processor *ShipProcessor) initHandler(abi *chain.Abi) {
processor.shipABI = abi
}
func (processor *ShipProcessor) queueMessage(channel api.Channel, payload []byte) bool {
err := processor.writer.Write(channel, payload)
if err != nil {
log.WithError(err).Errorf("Failed to post to channel '%s'", channel)
return false
}
return true
}
func (processor *ShipProcessor) encodeQueue(channel api.Channel, v interface{}) bool {
if payload, err := processor.encode(v); err == nil {
return processor.queueMessage(channel, payload)
}
return false
}
// updateAbiFromAction updates the contract abi based on the ship.Action passed.
func (processor *ShipProcessor) updateAbiFromAction(act *chain.Action) error {
set_abi := struct {
@ -130,25 +95,6 @@ func (processor *ShipProcessor) GetCurrentBlock() uint32 {
return processor.state.CurrentBlock
}
func (processor *ShipProcessor) broadcastAction(act *message.ActionTrace) {
payload, err := processor.encode(*act)
if err != nil {
log.WithField("act", act).Warn("failed to encode action")
return
}
channels := []api.Channel{
api.ActionChannel{}.Channel(),
api.ActionChannel{Name: act.Name}.Channel(),
api.ActionChannel{Contract: act.Contract}.Channel(),
api.ActionChannel{Name: act.Name, Contract: act.Contract}.Channel(),
}
for _, channel := range channels {
processor.queueMessage(channel, payload)
}
}
func (processor *ShipProcessor) processTransactionTrace(log *log.Entry, blockNumber uint32, block *ship.SignedBlock, trace *ship.TransactionTraceV0) {
logger := log.WithField("type", "trace").WithField("tx_id", trace.ID.String()).Dup()
@ -178,13 +124,15 @@ func (processor *ShipProcessor) processTransactionTrace(log *log.Entry, blockNum
actMsg.BlockNum = blockNumber
actMsg.Timestamp = timestamp
processor.broadcastAction(actMsg)
processor.queue.PostAction(*actMsg)
transaction.ActionTraces = append(transaction.ActionTraces, *actMsg)
}
}
processor.encodeQueue(api.TransactionChannel, transaction)
if err := processor.queue.PostTransactionTrace(transaction); err != nil {
logger.WithError(err).Error("Failed to post transaction trace")
}
}
func (processor *ShipProcessor) proccessActionTrace(logger *log.Entry, trace *ship.ActionTraceV1) *message.ActionTrace {
@ -255,14 +203,18 @@ func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0
// Check to see if we have a microfork and post a message to
// the rollback channel in that case.
if processor.state.CurrentBlock > 0 && blockNumber < processor.state.CurrentBlock {
log.WithField("old_block", processor.state.CurrentBlock).
WithField("new_block", blockResult.ThisBlock.BlockNum).
Warn("Fork detected, old_block is greater than new_block")
processor.encodeQueue(api.RollbackChannel, message.RollbackMessage{
msg := message.RollbackMessage{
OldBlockNum: processor.state.CurrentBlock,
NewBlockNum: blockResult.ThisBlock.BlockNum,
})
}
log.WithField("old_block", msg.OldBlockNum).
WithField("new_block", msg.NewBlockNum).
Warn("Fork detected, old_block is greater than new_block")
if err := processor.queue.PostRollback(msg); err != nil {
log.WithError(err).Error("Failed to write rollback message")
}
}
processor.state.CurrentBlock = blockNumber
@ -277,8 +229,9 @@ func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0
LastIrreversibleBlockNum: blockResult.LastIrreversible.BlockNum,
HeadBlockNum: blockResult.Head.BlockNum,
}
processor.encodeQueue(api.HeartbeatChannel, hb)
if err := processor.queue.PostHeartbeat(hb); err != nil {
log.WithError(err).Error("Failed to write heartbeat message")
}
}
mainLogger := log.WithField("block", blockNumber).Dup()
@ -329,25 +282,20 @@ func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0
rows = append(rows, msg)
}
message := message.TableDelta{
msg := message.TableDelta{
BlockNum: blockNumber,
Timestamp: timestamp,
Name: delta.V0.Name,
Rows: rows,
}
channels := []api.Channel{
api.TableDeltaChannel{}.Channel(),
api.TableDeltaChannel{Name: delta.V0.Name}.Channel(),
}
for _, channel := range channels {
processor.encodeQueue(channel, message)
if err := processor.queue.PostTableDelta(msg); err != nil {
logger.WithError(err).Error("Failed to post table delta message")
}
}
}
err := processor.writer.Flush()
err := processor.queue.Flush()
if err != nil {
log.WithError(err).Error("Failed to send messages")
}
@ -360,5 +308,5 @@ func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0
// Close closes the writer associated with the processor.
func (processor *ShipProcessor) Close() error {
return processor.writer.Close()
return processor.queue.Close()
}