From 8dbf411b36098dc0c8f7c713b4c8ab9837ee9960 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Sun, 7 Jan 2024 19:11:47 +0100 Subject: [PATCH 1/5] api/channel.go: adding TableDelta channel --- api/channel.go | 15 +++++++++++++++ api/channel_test.go | 18 ++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/api/channel.go b/api/channel.go index a843f54..b722037 100644 --- a/api/channel.go +++ b/api/channel.go @@ -66,3 +66,18 @@ func (a ActionChannel) Channel() Channel { return ch } + +// Table deltas +type TableDeltaChannel struct { + Name string +} + +func (td TableDeltaChannel) Channel() Channel { + ch := Channel{"tabledeltas"} + + if len(td.Name) > 0 { + ch.Append("name", td.Name) + } + + return ch +} diff --git a/api/channel_test.go b/api/channel_test.go index 58bc597..caaf4e7 100644 --- a/api/channel_test.go +++ b/api/channel_test.go @@ -129,3 +129,21 @@ func TestAction_Channel(t *testing.T) { }) } } + +func TestTableDelta_Channel(t *testing.T) { + tests := []struct { + name string + action TableDeltaChannel + want Channel + }{ + {"Empty", TableDeltaChannel{}, Channel{"tabledeltas"}}, + {"Contract", TableDeltaChannel{Name: "delta_name"}, Channel{"tabledeltas", "name", "delta_name"}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.action.Channel(); !got.Is(tt.want) { + t.Errorf("TableDeltaChannel.String() = %v, want %v", got, tt.want) + } + }) + } +} From dc6dd6ae701d097c83aad128eae44d2b768d70e5 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Sun, 7 Jan 2024 19:12:39 +0100 Subject: [PATCH 2/5] api/message/types.go: Adding TableDelta and TableDeltaRow structs --- api/message/types.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/api/message/types.go b/api/message/types.go index 5a2111e..f9442e0 100644 --- a/api/message/types.go +++ b/api/message/types.go @@ -80,3 +80,16 @@ func (act ActionTrace) GetData() (map[string]any, error) { } return nil, errors.New("failed to convert data to map") } + +type TableDeltaRow struct { + Present bool `json:"present" msgpack:"present"` + Data []byte `json:"data" msgpack:"data"` + RawData []byte `json:"raw_data" msgpack:"raw_data"` +} + +type TableDelta struct { + BlockNum uint32 `json:"blocknum" msgpack:"blocknum"` + Timestamp time.Time `json:"blocktimestamp" msgpack:"blocktimestamp"` + Name string `json:"name" msgpack:"name"` + Rows []TableDeltaRow `json:"rows" msgpack:"rows"` +} From b3961f4eecf1bdea5bd3171f16cb3cf9f594c6d6 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Sun, 21 Jan 2024 12:52:17 +0100 Subject: [PATCH 3/5] api/message/types.go: Update TableDeltaRow --- api/message/types.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/message/types.go b/api/message/types.go index f9442e0..6d75bd6 100644 --- a/api/message/types.go +++ b/api/message/types.go @@ -82,9 +82,9 @@ func (act ActionTrace) GetData() (map[string]any, error) { } type TableDeltaRow struct { - Present bool `json:"present" msgpack:"present"` - Data []byte `json:"data" msgpack:"data"` - RawData []byte `json:"raw_data" msgpack:"raw_data"` + Present bool `json:"present" msgpack:"present"` + Data map[string]any `json:"data" msgpack:"data"` + RawData []byte `json:"raw_data" msgpack:"raw_data"` } type TableDelta struct { From 2268d19b08e57f5f27ceaeaf02c2f98f234b2537 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Sun, 21 Jan 2024 13:13:03 +0100 Subject: [PATCH 4/5] app/ship_processor.go: Store abi from ship in processor. --- app/ship_processor.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/app/ship_processor.go b/app/ship_processor.go index 3fe0901..65711cb 100644 --- a/app/ship_processor.go +++ b/app/ship_processor.go @@ -52,6 +52,9 @@ type ShipProcessor struct { // System contract ("eosio" per default) syscontract eos.AccountName + + // ABI Returned from SHIP + shipABI *eos.ABI } // SpawnProcessor creates a new ShipProccessor that consumes the shipclient.Stream passed to it. @@ -69,6 +72,7 @@ func SpawnProccessor(shipStream *shipclient.Stream, loader StateLoader, saver St // Attach handlers shipStream.BlockHandler = processor.processBlock + shipStream.InitHandler = processor.initHandler // Needed because if nil, traces will not be included in the response from ship. shipStream.TraceHandler = func([]*ship.TransactionTraceV0) {} @@ -76,6 +80,10 @@ func SpawnProccessor(shipStream *shipclient.Stream, loader StateLoader, saver St return processor } +func (processor *ShipProcessor) initHandler(abi *eos.ABI) { + processor.shipABI = abi +} + func (processor *ShipProcessor) queueMessage(channel api.Channel, payload []byte) bool { err := processor.writer.Write(channel, payload) if err != nil { From 5249dc1f50503c7ab35c83c7f4e40e0cbc6b0fb5 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Sun, 21 Jan 2024 13:14:39 +0100 Subject: [PATCH 5/5] app/ship_processor.go: process table deltas. --- app/ship_processor.go | 48 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/app/ship_processor.go b/app/ship_processor.go index 65711cb..6e37513 100644 --- a/app/ship_processor.go +++ b/app/ship_processor.go @@ -76,6 +76,7 @@ func SpawnProccessor(shipStream *shipclient.Stream, loader StateLoader, saver St // Needed because if nil, traces will not be included in the response from ship. shipStream.TraceHandler = func([]*ship.TransactionTraceV0) {} + shipStream.TableDeltaHandler = func([]*ship.TableDeltaV0) {} return processor } @@ -282,6 +283,53 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { } } + // Process deltas + for _, delta := range block.Deltas.AsTableDeltasV0() { + + logger := log.WithField("type", "table_delta").WithField("table", delta.Name).Dup() + + rows := []message.TableDeltaRow{} + for _, row := range delta.Rows { + + msg := message.TableDeltaRow{ + Present: row.Present, + RawData: row.Data, + } + + if processor.shipABI != nil { + v, err := processor.shipABI.DecodeTableRowTyped(delta.Name, row.Data) + if err == nil { + err = json.Unmarshal(v, &msg.Data) + if err != nil { + logger.WithError(err).Error("Failed to decode json") + } + } else { + logger.Error("Failed to decode table delta") + } + } else { + logger.Warn("No SHIP ABI present") + } + + rows = append(rows, msg) + } + + message := message.TableDelta{ + BlockNum: block.Block.BlockNumber(), + Timestamp: block.Block.Timestamp.Time.UTC(), + Name: delta.Name, + Rows: rows, + } + + channels := []api.Channel{ + api.TableDeltaChannel{}.Channel(), + api.TableDeltaChannel{Name: delta.Name}.Channel(), + } + + for _, channel := range channels { + processor.encodeQueue(channel, message) + } + } + err := processor.writer.Flush() if err != nil { log.WithError(err).Error("Failed to send messages")