mirror of
https://github.com/eosswedenorg/thalos
synced 2026-07-03 11:53:41 +02:00
internal/server/ship_processor.go: refactor processDeltaRows() to reduce nested blocks
This commit is contained in:
parent
d5bc23a63e
commit
120c2acdc6
1 changed files with 36 additions and 29 deletions
|
|
@ -2,6 +2,7 @@ package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
|
||||||
"github.com/eosswedenorg/thalos/api/message"
|
"github.com/eosswedenorg/thalos/api/message"
|
||||||
"github.com/eosswedenorg/thalos/internal/abi"
|
"github.com/eosswedenorg/thalos/internal/abi"
|
||||||
|
|
@ -228,39 +229,45 @@ func (processor *ShipProcessor) proccessActionTrace(logger *log.Entry, trace *sh
|
||||||
func (processor *ShipProcessor) proccessDeltaRows(logger *log.Entry, table_name string, rows []ship.Row) []message.TableDeltaRow {
|
func (processor *ShipProcessor) proccessDeltaRows(logger *log.Entry, table_name string, rows []ship.Row) []message.TableDeltaRow {
|
||||||
out := []message.TableDeltaRow{}
|
out := []message.TableDeltaRow{}
|
||||||
for _, row := range rows {
|
for _, row := range rows {
|
||||||
|
msg, err := processor.proccessDeltaRow(row, table_name)
|
||||||
|
if err != nil {
|
||||||
|
logger.WithError(err).Warn("Failed to processs table delta row")
|
||||||
|
}
|
||||||
|
out = append(out, msg)
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func (processor *ShipProcessor) proccessDeltaRow(row ship.Row, table_name string) (message.TableDeltaRow, error) {
|
||||||
msg := message.TableDeltaRow{
|
msg := message.TableDeltaRow{
|
||||||
Present: row.Present,
|
Present: row.Present,
|
||||||
RawData: row.Data,
|
RawData: row.Data,
|
||||||
}
|
}
|
||||||
|
|
||||||
if processor.shipABI != nil {
|
if processor.shipABI == nil {
|
||||||
|
return msg, errors.New("No SHIP ABI present")
|
||||||
|
}
|
||||||
|
|
||||||
v, err := processor.shipABI.Decode(bytes.NewReader(row.Data), table_name)
|
v, err := processor.shipABI.Decode(bytes.NewReader(row.Data), table_name)
|
||||||
if err == nil {
|
if err != nil {
|
||||||
|
return msg, errors.New("Failed to decode table delta")
|
||||||
|
}
|
||||||
data, err := ship_helper.ParseTableDeltaData(v)
|
data, err := ship_helper.ParseTableDeltaData(v)
|
||||||
if err == nil {
|
if err != nil {
|
||||||
|
return msg, errors.New("Failed to parse table delta data")
|
||||||
|
}
|
||||||
|
|
||||||
|
msg.Data = data
|
||||||
|
|
||||||
// Decode contract row data
|
// Decode contract row data
|
||||||
if table_name == "contract_row" {
|
if table_name == "contract_row" {
|
||||||
dec, err := ship_helper.DecodeContractRow(processor.abi, data)
|
dec, err := ship_helper.DecodeContractRow(processor.abi, data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.WithError(err).Warn("Failed to decode contract row")
|
return msg, errors.New("Failed to decode contract row")
|
||||||
} else {
|
|
||||||
data["value"] = dec
|
|
||||||
}
|
}
|
||||||
|
msg.Data["value"] = dec
|
||||||
}
|
}
|
||||||
msg.Data = data
|
return msg, nil
|
||||||
} else {
|
|
||||||
logger.WithError(err).Error("Failed to parse table delta data")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logger.WithError(err).Error("Failed to decode table delta")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logger.Warn("No SHIP ABI present")
|
|
||||||
}
|
|
||||||
out = append(out, msg)
|
|
||||||
}
|
|
||||||
return out
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Callback function called by shipclient.Stream when a new block arrives.
|
// Callback function called by shipclient.Stream when a new block arrives.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue