diff --git a/api/channel.go b/api/channel.go index ffe2fba..baf33e8 100644 --- a/api/channel.go +++ b/api/channel.go @@ -67,3 +67,18 @@ func (a ActionChannel) Channel() Channel { return ch } + +// Table deltas +type TableDeltaChannel struct { + Name string +} + +func (td TableDeltaChannel) Channel() Channel { + ch := Channel{"tabledeltas"} + + if len(td.Name) > 0 { + ch.Append("name", td.Name) + } + + return ch +} diff --git a/api/channel_test.go b/api/channel_test.go index 58bc597..caaf4e7 100644 --- a/api/channel_test.go +++ b/api/channel_test.go @@ -129,3 +129,21 @@ func TestAction_Channel(t *testing.T) { }) } } + +func TestTableDelta_Channel(t *testing.T) { + tests := []struct { + name string + action TableDeltaChannel + want Channel + }{ + {"Empty", TableDeltaChannel{}, Channel{"tabledeltas"}}, + {"Contract", TableDeltaChannel{Name: "delta_name"}, Channel{"tabledeltas", "name", "delta_name"}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.action.Channel(); !got.Is(tt.want) { + t.Errorf("TableDeltaChannel.String() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/api/message/types.go b/api/message/types.go index 081a517..d8948e3 100644 --- a/api/message/types.go +++ b/api/message/types.go @@ -85,3 +85,16 @@ type RollbackMessage struct { OldBlockNum uint32 `json:"old_block" msgpack:"old_block"` NewBlockNum uint32 `json:"new_block" msgpack:"new_block"` } + +type TableDeltaRow struct { + Present bool `json:"present" msgpack:"present"` + Data map[string]any `json:"data" msgpack:"data"` + RawData []byte `json:"raw_data" msgpack:"raw_data"` +} + +type TableDelta struct { + BlockNum uint32 `json:"blocknum" msgpack:"blocknum"` + Timestamp time.Time `json:"blocktimestamp" msgpack:"blocktimestamp"` + Name string `json:"name" msgpack:"name"` + Rows []TableDeltaRow `json:"rows" msgpack:"rows"` +} diff --git a/app/ship_processor.go b/app/ship_processor.go index 3e85648..96d9ee1 100644 --- a/app/ship_processor.go +++ b/app/ship_processor.go @@ -52,6 +52,9 @@ type ShipProcessor struct { // System contract ("eosio" per default) syscontract eos.AccountName + + // ABI Returned from SHIP + shipABI *eos.ABI } // SpawnProcessor creates a new ShipProccessor that consumes the shipclient.Stream passed to it. @@ -69,13 +72,19 @@ func SpawnProccessor(shipStream *shipclient.Stream, loader StateLoader, saver St // Attach handlers shipStream.BlockHandler = processor.processBlock + shipStream.InitHandler = processor.initHandler // Needed because if nil, traces will not be included in the response from ship. shipStream.TraceHandler = func([]*ship.TransactionTraceV0) {} + shipStream.TableDeltaHandler = func([]*ship.TableDeltaV0) {} return processor } +func (processor *ShipProcessor) initHandler(abi *eos.ABI) { + processor.shipABI = abi +} + func (processor *ShipProcessor) queueMessage(channel api.Channel, payload []byte) bool { err := processor.writer.Write(channel, payload) if err != nil { @@ -287,6 +296,53 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { } } + // Process deltas + for _, delta := range block.Deltas.AsTableDeltasV0() { + + logger := log.WithField("type", "table_delta").WithField("table", delta.Name).Dup() + + rows := []message.TableDeltaRow{} + for _, row := range delta.Rows { + + msg := message.TableDeltaRow{ + Present: row.Present, + RawData: row.Data, + } + + if processor.shipABI != nil { + v, err := processor.shipABI.DecodeTableRowTyped(delta.Name, row.Data) + if err == nil { + err = json.Unmarshal(v, &msg.Data) + if err != nil { + logger.WithError(err).Error("Failed to decode json") + } + } else { + logger.Error("Failed to decode table delta") + } + } else { + logger.Warn("No SHIP ABI present") + } + + rows = append(rows, msg) + } + + message := message.TableDelta{ + BlockNum: block.Block.BlockNumber(), + Timestamp: block.Block.Timestamp.Time.UTC(), + Name: delta.Name, + Rows: rows, + } + + channels := []api.Channel{ + api.TableDeltaChannel{}.Channel(), + api.TableDeltaChannel{Name: delta.Name}.Channel(), + } + + for _, channel := range channels { + processor.encodeQueue(channel, message) + } + } + err := processor.writer.Flush() if err != nil { log.WithError(err).Error("Failed to send messages")