diff --git a/app/ship_processor.go b/app/ship_processor.go index 65711cb..6e37513 100644 --- a/app/ship_processor.go +++ b/app/ship_processor.go @@ -76,6 +76,7 @@ func SpawnProccessor(shipStream *shipclient.Stream, loader StateLoader, saver St // Needed because if nil, traces will not be included in the response from ship. shipStream.TraceHandler = func([]*ship.TransactionTraceV0) {} + shipStream.TableDeltaHandler = func([]*ship.TableDeltaV0) {} return processor } @@ -282,6 +283,53 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { } } + // Process deltas + for _, delta := range block.Deltas.AsTableDeltasV0() { + + logger := log.WithField("type", "table_delta").WithField("table", delta.Name).Dup() + + rows := []message.TableDeltaRow{} + for _, row := range delta.Rows { + + msg := message.TableDeltaRow{ + Present: row.Present, + RawData: row.Data, + } + + if processor.shipABI != nil { + v, err := processor.shipABI.DecodeTableRowTyped(delta.Name, row.Data) + if err == nil { + err = json.Unmarshal(v, &msg.Data) + if err != nil { + logger.WithError(err).Error("Failed to decode json") + } + } else { + logger.Error("Failed to decode table delta") + } + } else { + logger.Warn("No SHIP ABI present") + } + + rows = append(rows, msg) + } + + message := message.TableDelta{ + BlockNum: block.Block.BlockNumber(), + Timestamp: block.Block.Timestamp.Time.UTC(), + Name: delta.Name, + Rows: rows, + } + + channels := []api.Channel{ + api.TableDeltaChannel{}.Channel(), + api.TableDeltaChannel{Name: delta.Name}.Channel(), + } + + for _, channel := range channels { + processor.encodeQueue(channel, message) + } + } + err := processor.writer.Flush() if err != nil { log.WithError(err).Error("Failed to send messages")