diff --git a/internal/server/ship_processor.go b/internal/server/ship_processor.go index 649828d..6b05262 100644 --- a/internal/server/ship_processor.go +++ b/internal/server/ship_processor.go @@ -193,6 +193,36 @@ func (processor *ShipProcessor) proccessActionTrace(logger *log.Entry, trace *sh return act } +func (processor *ShipProcessor) proccessDeltaRows(logger *log.Entry, table_name string, rows []ship.Row) []message.TableDeltaRow { + out := []message.TableDeltaRow{} + for _, row := range rows { + + msg := message.TableDeltaRow{ + Present: row.Present, + RawData: row.Data, + } + + if processor.shipABI != nil { + + v, err := processor.shipABI.Decode(bytes.NewReader(row.Data), table_name) + if err == nil { + v, err := parseTableDeltaData(v) + if err == nil { + msg.Data = v + } else { + logger.WithError(err).Error("Failed to parse table delta data") + } + } else { + logger.Error("Failed to decode table delta") + } + } else { + logger.Warn("No SHIP ABI present") + } + out = append(out, msg) + } + return out +} + // Callback function called by shipclient.Stream when a new block arrives. func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0) { block := ship.SignedBlock{} @@ -254,42 +284,14 @@ func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0 if err := blockResult.Deltas.Unpack(&deltas); err != nil { mainLogger.WithError(err).Error("Failed to unpack table deltas") } else { + logger := mainLogger.WithField("type", "table_delta").Dup() for _, delta := range deltas { - logger := mainLogger.WithField("type", "table_delta").Dup() - - rows := []message.TableDeltaRow{} - for _, row := range delta.V0.Rows { - - msg := message.TableDeltaRow{ - Present: row.Present, - RawData: row.Data, - } - - if processor.shipABI != nil { - - v, err := processor.shipABI.Decode(bytes.NewReader(row.Data), delta.V0.Name) - if err == nil { - v, err := parseTableDeltaData(v) - if err == nil { - msg.Data = v - } else { - logger.WithError(err).Error("Failed to parse table delta data") - } - } else { - logger.Error("Failed to decode table delta") - } - } else { - logger.Warn("No SHIP ABI present") - } - rows = append(rows, msg) - } - msg := message.TableDelta{ BlockNum: blockNumber, Timestamp: timestamp, Name: delta.V0.Name, - Rows: rows, + Rows: processor.proccessDeltaRows(logger, delta.V0.Name, delta.V0.Rows), } if err := processor.queue.PostTableDelta(msg); err != nil {