1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-16 04:24:56 +02:00

app/ship_processor.go: process table deltas.

This commit is contained in:
Henrik Hautakoski 2024-01-21 13:14:39 +01:00
parent 2268d19b08
commit 5249dc1f50

View file

@ -76,6 +76,7 @@ func SpawnProccessor(shipStream *shipclient.Stream, loader StateLoader, saver St
// Needed because if nil, traces will not be included in the response from ship.
shipStream.TraceHandler = func([]*ship.TransactionTraceV0) {}
shipStream.TableDeltaHandler = func([]*ship.TableDeltaV0) {}
return processor
}
@ -282,6 +283,53 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) {
}
}
// Process deltas
for _, delta := range block.Deltas.AsTableDeltasV0() {
logger := log.WithField("type", "table_delta").WithField("table", delta.Name).Dup()
rows := []message.TableDeltaRow{}
for _, row := range delta.Rows {
msg := message.TableDeltaRow{
Present: row.Present,
RawData: row.Data,
}
if processor.shipABI != nil {
v, err := processor.shipABI.DecodeTableRowTyped(delta.Name, row.Data)
if err == nil {
err = json.Unmarshal(v, &msg.Data)
if err != nil {
logger.WithError(err).Error("Failed to decode json")
}
} else {
logger.Error("Failed to decode table delta")
}
} else {
logger.Warn("No SHIP ABI present")
}
rows = append(rows, msg)
}
message := message.TableDelta{
BlockNum: block.Block.BlockNumber(),
Timestamp: block.Block.Timestamp.Time.UTC(),
Name: delta.Name,
Rows: rows,
}
channels := []api.Channel{
api.TableDeltaChannel{}.Channel(),
api.TableDeltaChannel{Name: delta.Name}.Channel(),
}
for _, channel := range channels {
processor.encodeQueue(channel, message)
}
}
err := processor.writer.Flush()
if err != nil {
log.WithError(err).Error("Failed to send messages")