diff --git a/internal/server/ship_processor.go b/internal/server/ship_processor.go index f348fa4..8724d22 100644 --- a/internal/server/ship_processor.go +++ b/internal/server/ship_processor.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/hex" - "github.com/eosswedenorg/thalos/api" "github.com/eosswedenorg/thalos/api/message" "github.com/eosswedenorg/thalos/internal/abi" "github.com/eosswedenorg/thalos/internal/driver" @@ -16,19 +15,6 @@ import ( "github.com/pnx/antelope-go/ship" ) -// logDecoratedEncoder decorates a message.Encoder and logs any error. -func logDecoratedEncoder(encoder message.Encoder) message.Encoder { - return func(v interface{}) ([]byte, error) { - payload, err := encoder(v) - if err != nil { - log.WithError(err). - WithField("v", v). - Warn("Failed to encode message") - } - return payload, err - } -} - // A ShipProcessor will consume messages from a ship stream, convert the messages into // thalos specific ones, encode them and finally post them to an api.Writer type ShipProcessor struct { @@ -38,11 +24,7 @@ type ShipProcessor struct { // Abi manager used for cacheing abi *abi.AbiManager - // Writer to send messages to. - writer driver.Writer - - // Encoder used to encode messages - encode message.Encoder + queue MessageQueue // Function for saving state. saver StateSaver @@ -62,10 +44,9 @@ func SpawnProccessor(shipStream *shipclient.Stream, loader StateLoader, saver St processor := &ShipProcessor{ saver: saver, abi: abi, - writer: writer, shipStream: shipStream, - encode: logDecoratedEncoder(codec.Encoder), syscontract: chain.N("eosio"), + queue: NewMessageQueue(writer, codec.Encoder), } loader(&processor.state) @@ -85,22 +66,6 @@ func (processor *ShipProcessor) initHandler(abi *chain.Abi) { processor.shipABI = abi } -func (processor *ShipProcessor) queueMessage(channel api.Channel, payload []byte) bool { - err := processor.writer.Write(channel, payload) - if err != nil { - log.WithError(err).Errorf("Failed to post to channel '%s'", channel) - return false - } - return true -} - -func (processor *ShipProcessor) encodeQueue(channel api.Channel, v interface{}) bool { - if payload, err := processor.encode(v); err == nil { - return processor.queueMessage(channel, payload) - } - return false -} - // updateAbiFromAction updates the contract abi based on the ship.Action passed. func (processor *ShipProcessor) updateAbiFromAction(act *chain.Action) error { set_abi := struct { @@ -130,25 +95,6 @@ func (processor *ShipProcessor) GetCurrentBlock() uint32 { return processor.state.CurrentBlock } -func (processor *ShipProcessor) broadcastAction(act *message.ActionTrace) { - payload, err := processor.encode(*act) - if err != nil { - log.WithField("act", act).Warn("failed to encode action") - return - } - - channels := []api.Channel{ - api.ActionChannel{}.Channel(), - api.ActionChannel{Name: act.Name}.Channel(), - api.ActionChannel{Contract: act.Contract}.Channel(), - api.ActionChannel{Name: act.Name, Contract: act.Contract}.Channel(), - } - - for _, channel := range channels { - processor.queueMessage(channel, payload) - } -} - func (processor *ShipProcessor) processTransactionTrace(log *log.Entry, blockNumber uint32, block *ship.SignedBlock, trace *ship.TransactionTraceV0) { logger := log.WithField("type", "trace").WithField("tx_id", trace.ID.String()).Dup() @@ -178,13 +124,15 @@ func (processor *ShipProcessor) processTransactionTrace(log *log.Entry, blockNum actMsg.BlockNum = blockNumber actMsg.Timestamp = timestamp - processor.broadcastAction(actMsg) + processor.queue.PostAction(*actMsg) transaction.ActionTraces = append(transaction.ActionTraces, *actMsg) } } - processor.encodeQueue(api.TransactionChannel, transaction) + if err := processor.queue.PostTransactionTrace(transaction); err != nil { + logger.WithError(err).Error("Failed to post transaction trace") + } } func (processor *ShipProcessor) proccessActionTrace(logger *log.Entry, trace *ship.ActionTraceV1) *message.ActionTrace { @@ -255,14 +203,18 @@ func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0 // Check to see if we have a microfork and post a message to // the rollback channel in that case. if processor.state.CurrentBlock > 0 && blockNumber < processor.state.CurrentBlock { - log.WithField("old_block", processor.state.CurrentBlock). - WithField("new_block", blockResult.ThisBlock.BlockNum). - Warn("Fork detected, old_block is greater than new_block") - processor.encodeQueue(api.RollbackChannel, message.RollbackMessage{ + msg := message.RollbackMessage{ OldBlockNum: processor.state.CurrentBlock, NewBlockNum: blockResult.ThisBlock.BlockNum, - }) + } + log.WithField("old_block", msg.OldBlockNum). + WithField("new_block", msg.NewBlockNum). + Warn("Fork detected, old_block is greater than new_block") + + if err := processor.queue.PostRollback(msg); err != nil { + log.WithError(err).Error("Failed to write rollback message") + } } processor.state.CurrentBlock = blockNumber @@ -277,8 +229,9 @@ func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0 LastIrreversibleBlockNum: blockResult.LastIrreversible.BlockNum, HeadBlockNum: blockResult.Head.BlockNum, } - - processor.encodeQueue(api.HeartbeatChannel, hb) + if err := processor.queue.PostHeartbeat(hb); err != nil { + log.WithError(err).Error("Failed to write heartbeat message") + } } mainLogger := log.WithField("block", blockNumber).Dup() @@ -329,25 +282,20 @@ func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0 rows = append(rows, msg) } - message := message.TableDelta{ + msg := message.TableDelta{ BlockNum: blockNumber, Timestamp: timestamp, Name: delta.V0.Name, Rows: rows, } - channels := []api.Channel{ - api.TableDeltaChannel{}.Channel(), - api.TableDeltaChannel{Name: delta.V0.Name}.Channel(), - } - - for _, channel := range channels { - processor.encodeQueue(channel, message) + if err := processor.queue.PostTableDelta(msg); err != nil { + logger.WithError(err).Error("Failed to post table delta message") } } } - err := processor.writer.Flush() + err := processor.queue.Flush() if err != nil { log.WithError(err).Error("Failed to send messages") } @@ -360,5 +308,5 @@ func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0 // Close closes the writer associated with the processor. func (processor *ShipProcessor) Close() error { - return processor.writer.Close() + return processor.queue.Close() }