From 8321c1633a9751e87a8b4757b52272311c60235f Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Sun, 7 Jan 2024 19:25:57 +0100 Subject: [PATCH 1/3] api/channel.go: Adding rollback channel. --- api/channel.go | 1 + 1 file changed, 1 insertion(+) diff --git a/api/channel.go b/api/channel.go index a843f54..ffe2fba 100644 --- a/api/channel.go +++ b/api/channel.go @@ -45,6 +45,7 @@ func (c Channel) Is(other Channel) bool { var ( TransactionChannel = Channel{"transactions"} HeartbeatChannel = Channel{"heartbeat"} + RollbackChannel = Channel{"rollback"} ) // Action Channel From 5b5b28669a4474a29761b226ed1df1d6d7f276fc Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Sun, 7 Jan 2024 19:26:30 +0100 Subject: [PATCH 2/3] api/message/types.go: Adding RollbackMessage struct --- api/message/types.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/api/message/types.go b/api/message/types.go index 5a2111e..081a517 100644 --- a/api/message/types.go +++ b/api/message/types.go @@ -80,3 +80,8 @@ func (act ActionTrace) GetData() (map[string]any, error) { } return nil, errors.New("failed to convert data to map") } + +type RollbackMessage struct { + OldBlockNum uint32 `json:"old_block" msgpack:"old_block"` + NewBlockNum uint32 `json:"new_block" msgpack:"new_block"` +} From a8490f85c4fd11d54d2fa2c402151ba8caba0900 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Sun, 7 Jan 2024 19:28:59 +0100 Subject: [PATCH 3/3] app/ship_processor.go: Implement support to send rollback messages on forks. --- app/ship_processor.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/app/ship_processor.go b/app/ship_processor.go index 3fe0901..2aae7b4 100644 --- a/app/ship_processor.go +++ b/app/ship_processor.go @@ -135,6 +135,19 @@ func (processor *ShipProcessor) GetCurrentBlock() uint32 { // Callback function called by shipclient.Stream when a new block arrives. func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { + // Check to see if we have a microfork and post a message to + // the rollback channel in that case. + if processor.state.CurrentBlock > 0 && block.ThisBlock.BlockNum < processor.state.CurrentBlock { + log.WithField("old_block", processor.state.CurrentBlock). + WithField("new_block", block.ThisBlock.BlockNum). + Warn("Fork detected, old_block is greater than new_block") + + processor.encodeQueue(api.RollbackChannel, message.RollbackMessage{ + OldBlockNum: processor.state.CurrentBlock, + NewBlockNum: block.ThisBlock.BlockNum, + }) + } + processor.state.CurrentBlock = block.ThisBlock.BlockNum if block.ThisBlock.BlockNum%100 == 0 {