1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-16 04:24:56 +02:00

Merge branch 'table-deltas'

This commit is contained in:
Henrik Hautakoski 2024-01-21 14:01:08 +01:00
commit 0b3b383977
4 changed files with 102 additions and 0 deletions

View file

@ -67,3 +67,18 @@ func (a ActionChannel) Channel() Channel {
return ch
}
// Table deltas
type TableDeltaChannel struct {
Name string
}
func (td TableDeltaChannel) Channel() Channel {
ch := Channel{"tabledeltas"}
if len(td.Name) > 0 {
ch.Append("name", td.Name)
}
return ch
}

View file

@ -129,3 +129,21 @@ func TestAction_Channel(t *testing.T) {
})
}
}
func TestTableDelta_Channel(t *testing.T) {
tests := []struct {
name string
action TableDeltaChannel
want Channel
}{
{"Empty", TableDeltaChannel{}, Channel{"tabledeltas"}},
{"Contract", TableDeltaChannel{Name: "delta_name"}, Channel{"tabledeltas", "name", "delta_name"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.action.Channel(); !got.Is(tt.want) {
t.Errorf("TableDeltaChannel.String() = %v, want %v", got, tt.want)
}
})
}
}

View file

@ -85,3 +85,16 @@ type RollbackMessage struct {
OldBlockNum uint32 `json:"old_block" msgpack:"old_block"`
NewBlockNum uint32 `json:"new_block" msgpack:"new_block"`
}
type TableDeltaRow struct {
Present bool `json:"present" msgpack:"present"`
Data map[string]any `json:"data" msgpack:"data"`
RawData []byte `json:"raw_data" msgpack:"raw_data"`
}
type TableDelta struct {
BlockNum uint32 `json:"blocknum" msgpack:"blocknum"`
Timestamp time.Time `json:"blocktimestamp" msgpack:"blocktimestamp"`
Name string `json:"name" msgpack:"name"`
Rows []TableDeltaRow `json:"rows" msgpack:"rows"`
}

View file

@ -52,6 +52,9 @@ type ShipProcessor struct {
// System contract ("eosio" per default)
syscontract eos.AccountName
// ABI Returned from SHIP
shipABI *eos.ABI
}
// SpawnProcessor creates a new ShipProccessor that consumes the shipclient.Stream passed to it.
@ -69,13 +72,19 @@ func SpawnProccessor(shipStream *shipclient.Stream, loader StateLoader, saver St
// Attach handlers
shipStream.BlockHandler = processor.processBlock
shipStream.InitHandler = processor.initHandler
// Needed because if nil, traces will not be included in the response from ship.
shipStream.TraceHandler = func([]*ship.TransactionTraceV0) {}
shipStream.TableDeltaHandler = func([]*ship.TableDeltaV0) {}
return processor
}
func (processor *ShipProcessor) initHandler(abi *eos.ABI) {
processor.shipABI = abi
}
func (processor *ShipProcessor) queueMessage(channel api.Channel, payload []byte) bool {
err := processor.writer.Write(channel, payload)
if err != nil {
@ -287,6 +296,53 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) {
}
}
// Process deltas
for _, delta := range block.Deltas.AsTableDeltasV0() {
logger := log.WithField("type", "table_delta").WithField("table", delta.Name).Dup()
rows := []message.TableDeltaRow{}
for _, row := range delta.Rows {
msg := message.TableDeltaRow{
Present: row.Present,
RawData: row.Data,
}
if processor.shipABI != nil {
v, err := processor.shipABI.DecodeTableRowTyped(delta.Name, row.Data)
if err == nil {
err = json.Unmarshal(v, &msg.Data)
if err != nil {
logger.WithError(err).Error("Failed to decode json")
}
} else {
logger.Error("Failed to decode table delta")
}
} else {
logger.Warn("No SHIP ABI present")
}
rows = append(rows, msg)
}
message := message.TableDelta{
BlockNum: block.Block.BlockNumber(),
Timestamp: block.Block.Timestamp.Time.UTC(),
Name: delta.Name,
Rows: rows,
}
channels := []api.Channel{
api.TableDeltaChannel{}.Channel(),
api.TableDeltaChannel{Name: delta.Name}.Channel(),
}
for _, channel := range channels {
processor.encodeQueue(channel, message)
}
}
err := processor.writer.Flush()
if err != nil {
log.WithError(err).Error("Failed to send messages")