1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-17 04:30:03 +02:00

internal/server/ship_processor.go: refactor out table delta row proccessing to its own function

This commit is contained in:
Henrik Hautakoski 2024-05-17 17:08:29 +02:00
parent cc754ee976
commit 8bd3736684

View file

@ -193,6 +193,36 @@ func (processor *ShipProcessor) proccessActionTrace(logger *log.Entry, trace *sh
return act
}
func (processor *ShipProcessor) proccessDeltaRows(logger *log.Entry, table_name string, rows []ship.Row) []message.TableDeltaRow {
out := []message.TableDeltaRow{}
for _, row := range rows {
msg := message.TableDeltaRow{
Present: row.Present,
RawData: row.Data,
}
if processor.shipABI != nil {
v, err := processor.shipABI.Decode(bytes.NewReader(row.Data), table_name)
if err == nil {
v, err := parseTableDeltaData(v)
if err == nil {
msg.Data = v
} else {
logger.WithError(err).Error("Failed to parse table delta data")
}
} else {
logger.Error("Failed to decode table delta")
}
} else {
logger.Warn("No SHIP ABI present")
}
out = append(out, msg)
}
return out
}
// Callback function called by shipclient.Stream when a new block arrives.
func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0) {
block := ship.SignedBlock{}
@ -254,42 +284,14 @@ func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0
if err := blockResult.Deltas.Unpack(&deltas); err != nil {
mainLogger.WithError(err).Error("Failed to unpack table deltas")
} else {
logger := mainLogger.WithField("type", "table_delta").Dup()
for _, delta := range deltas {
logger := mainLogger.WithField("type", "table_delta").Dup()
rows := []message.TableDeltaRow{}
for _, row := range delta.V0.Rows {
msg := message.TableDeltaRow{
Present: row.Present,
RawData: row.Data,
}
if processor.shipABI != nil {
v, err := processor.shipABI.Decode(bytes.NewReader(row.Data), delta.V0.Name)
if err == nil {
v, err := parseTableDeltaData(v)
if err == nil {
msg.Data = v
} else {
logger.WithError(err).Error("Failed to parse table delta data")
}
} else {
logger.Error("Failed to decode table delta")
}
} else {
logger.Warn("No SHIP ABI present")
}
rows = append(rows, msg)
}
msg := message.TableDelta{
BlockNum: blockNumber,
Timestamp: timestamp,
Name: delta.V0.Name,
Rows: rows,
Rows: processor.proccessDeltaRows(logger, delta.V0.Name, delta.V0.Rows),
}
if err := processor.queue.PostTableDelta(msg); err != nil {