diff --git a/api/channel.go b/api/channel.go index a843f54..ffe2fba 100644 --- a/api/channel.go +++ b/api/channel.go @@ -45,6 +45,7 @@ func (c Channel) Is(other Channel) bool { var ( TransactionChannel = Channel{"transactions"} HeartbeatChannel = Channel{"heartbeat"} + RollbackChannel = Channel{"rollback"} ) // Action Channel diff --git a/api/message/types.go b/api/message/types.go index 5a2111e..081a517 100644 --- a/api/message/types.go +++ b/api/message/types.go @@ -80,3 +80,8 @@ func (act ActionTrace) GetData() (map[string]any, error) { } return nil, errors.New("failed to convert data to map") } + +type RollbackMessage struct { + OldBlockNum uint32 `json:"old_block" msgpack:"old_block"` + NewBlockNum uint32 `json:"new_block" msgpack:"new_block"` +} diff --git a/app/ship_processor.go b/app/ship_processor.go index 3fe0901..2aae7b4 100644 --- a/app/ship_processor.go +++ b/app/ship_processor.go @@ -135,6 +135,19 @@ func (processor *ShipProcessor) GetCurrentBlock() uint32 { // Callback function called by shipclient.Stream when a new block arrives. func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { + // Check to see if we have a microfork and post a message to + // the rollback channel in that case. + if processor.state.CurrentBlock > 0 && block.ThisBlock.BlockNum < processor.state.CurrentBlock { + log.WithField("old_block", processor.state.CurrentBlock). + WithField("new_block", block.ThisBlock.BlockNum). + Warn("Fork detected, old_block is greater than new_block") + + processor.encodeQueue(api.RollbackChannel, message.RollbackMessage{ + OldBlockNum: processor.state.CurrentBlock, + NewBlockNum: block.ThisBlock.BlockNum, + }) + } + processor.state.CurrentBlock = block.ThisBlock.BlockNum if block.ThisBlock.BlockNum%100 == 0 {