1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-18 04:40:03 +02:00

Update to use antelope-go instead of eos-go library

This commit is contained in:
Henrik Hautakoski 2024-04-29 21:00:45 +02:00
parent 8aa93dd078
commit 63da39f03b
7 changed files with 181 additions and 211 deletions

View file

@ -1,25 +1,27 @@
package server
import "github.com/eoscanada/eos-go/ship"
import (
"github.com/pnx/antelope-go/ship"
)
// convert a ActionTrace to ActionTraceV1
func toActionTraceV1(trace *ship.ActionTrace) *ship.ActionTraceV1 {
if trace_v0, ok := trace.Impl.(*ship.ActionTraceV0); ok {
if trace.V0 != nil {
// convert to v1
return &ship.ActionTraceV1{
ActionOrdinal: trace_v0.ActionOrdinal,
CreatorActionOrdinal: trace_v0.CreatorActionOrdinal,
Receipt: trace_v0.Receipt,
Receiver: trace_v0.Receiver,
Act: trace_v0.Act,
ContextFree: trace_v0.ContextFree,
Elapsed: trace_v0.Elapsed,
Console: trace_v0.Console,
AccountRamDeltas: trace_v0.AccountRamDeltas,
Except: trace_v0.Except,
ErrorCode: trace_v0.ErrorCode,
ActionOrdinal: trace.V0.ActionOrdinal,
CreatorActionOrdinal: trace.V0.CreatorActionOrdinal,
Receipt: trace.V0.Receipt,
Receiver: trace.V0.Receiver,
Act: trace.V0.Act,
ContextFree: trace.V0.ContextFree,
Elapsed: trace.V0.Elapsed,
Console: trace.V0.Console,
AccountRamDeltas: trace.V0.AccountRamDeltas,
Except: trace.V0.Except,
ErrorCode: trace.V0.ErrorCode,
ReturnValue: []byte{},
}
}
return trace.Impl.(*ship.ActionTraceV1)
return trace.V1
}

View file

@ -1,8 +1,8 @@
package server
import (
"bytes"
"encoding/hex"
"encoding/json"
"github.com/eosswedenorg/thalos/api"
"github.com/eosswedenorg/thalos/api/message"
@ -11,9 +11,9 @@ import (
log "github.com/sirupsen/logrus"
"github.com/eoscanada/eos-go"
"github.com/eoscanada/eos-go/ship"
shipclient "github.com/eosswedenorg-go/antelope-ship-client"
"github.com/pnx/antelope-go/chain"
"github.com/pnx/antelope-go/ship"
)
// logDecoratedEncoder decorates a message.Encoder and logs any error.
@ -51,10 +51,10 @@ type ShipProcessor struct {
state State
// System contract ("eosio" per default)
syscontract eos.AccountName
syscontract chain.Name
// ABI Returned from SHIP
shipABI *eos.ABI
shipABI *chain.Abi
}
// SpawnProcessor creates a new ShipProccessor that consumes the shipclient.Stream passed to it.
@ -65,7 +65,7 @@ func SpawnProccessor(shipStream *shipclient.Stream, loader StateLoader, saver St
writer: writer,
shipStream: shipStream,
encode: logDecoratedEncoder(codec.Encoder),
syscontract: eos.AccountName("eosio"),
syscontract: chain.N("eosio"),
}
loader(&processor.state)
@ -75,13 +75,13 @@ func SpawnProccessor(shipStream *shipclient.Stream, loader StateLoader, saver St
shipStream.InitHandler = processor.initHandler
// Needed because if nil, traces/table deltas will not be included in the response from ship.
shipStream.TraceHandler = func([]*ship.TransactionTraceV0) {}
shipStream.TableDeltaHandler = func([]*ship.TableDeltaV0) {}
shipStream.TraceHandler = func(*ship.TransactionTraceArray) {}
shipStream.TableDeltaHandler = func(*ship.TableDeltaArray) {}
return processor
}
func (processor *ShipProcessor) initHandler(abi *eos.ABI) {
func (processor *ShipProcessor) initHandler(abi *chain.Abi) {
processor.shipABI = abi
}
@ -101,26 +101,14 @@ func (processor *ShipProcessor) encodeQueue(channel api.Channel, v interface{})
return false
}
func decode(abi *eos.ABI, act *ship.Action, v any) error {
jsondata, err := abi.DecodeAction(act.Data, act.Name)
if err != nil {
return err
}
return json.Unmarshal(jsondata, v)
}
// updateAbiFromAction updates the contract abi based on the ship.Action passed.
func (processor *ShipProcessor) updateAbiFromAction(act *ship.Action) error {
ABI, err := processor.abi.GetAbi(processor.syscontract)
if err != nil {
return err
}
func (processor *ShipProcessor) updateAbiFromAction(act *chain.Action) error {
set_abi := struct {
Abi string
Account eos.AccountName
Account chain.Name
}{}
if err = decode(ABI, act, &set_abi); err != nil {
if err := act.DecodeInto(&set_abi); err != nil {
return err
}
@ -129,11 +117,11 @@ func (processor *ShipProcessor) updateAbiFromAction(act *ship.Action) error {
return err
}
contract_abi := eos.ABI{}
if err = eos.UnmarshalBinary(binary_abi, &contract_abi); err != nil {
contract_abi := chain.Abi{}
err = chain.NewDecoder(bytes.NewReader(binary_abi)).Decode(&contract_abi)
if err != nil {
return err
}
return processor.abi.SetAbi(set_abi.Account, &contract_abi)
}
@ -161,14 +149,16 @@ func (processor *ShipProcessor) broadcastAction(act *message.ActionTrace) {
}
}
func (processor *ShipProcessor) processTransactionTrace(log *log.Entry, block *ship.SignedBlockBytes, trace *ship.TransactionTraceV0) {
func (processor *ShipProcessor) processTransactionTrace(log *log.Entry, blockNumber uint32, block *ship.SignedBlock, trace *ship.TransactionTraceV0) {
logger := log.WithField("type", "trace").WithField("tx_id", trace.ID.String()).Dup()
timestamp := block.BlockHeader.Timestamp.Time().UTC()
transaction := message.TransactionTrace{
ID: trace.ID.String(),
BlockNum: block.BlockNumber(),
Timestamp: block.Timestamp.UTC(),
Status: trace.Status.String(),
ID: trace.ID.String(),
BlockNum: blockNumber,
Timestamp: timestamp,
// Status: trace.Status,
CPUUsageUS: trace.CPUUsageUS,
NetUsage: trace.NetUsage,
NetUsageWords: uint32(trace.NetUsageWords),
@ -185,8 +175,8 @@ func (processor *ShipProcessor) processTransactionTrace(log *log.Entry, block *s
actMsg := processor.proccessActionTrace(logger, actionTrace)
if actMsg != nil {
actMsg.TxID = trace.ID.String()
actMsg.BlockNum = block.BlockNumber()
actMsg.Timestamp = block.Timestamp.UTC()
actMsg.BlockNum = blockNumber
actMsg.Timestamp = timestamp
processor.broadcastAction(actMsg)
@ -199,8 +189,8 @@ func (processor *ShipProcessor) processTransactionTrace(log *log.Entry, block *s
func (processor *ShipProcessor) proccessActionTrace(logger *log.Entry, trace *ship.ActionTraceV1) *message.ActionTrace {
// Check if actions updates an abi.
if trace.Act.Account == processor.syscontract && trace.Act.Name == eos.ActionName("setabi") {
err := processor.updateAbiFromAction(trace.Act)
if trace.Act.Account == processor.syscontract && trace.Act.Name == chain.N("setabi") {
err := processor.updateAbiFromAction(&trace.Act)
if err != nil {
logger.WithError(err).Warn("Failed to update abi")
}
@ -214,7 +204,7 @@ func (processor *ShipProcessor) proccessActionTrace(logger *log.Entry, trace *sh
}
if trace.Receipt != nil {
receipt := trace.Receipt.Impl.(*ship.ActionReceiptV0)
receipt := trace.Receipt.V0
act.Receipt = &message.ActionReceipt{
Receiver: receipt.Receiver.String(),
ActDigest: receipt.ActDigest.String(),
@ -241,7 +231,7 @@ func (processor *ShipProcessor) proccessActionTrace(logger *log.Entry, trace *sh
ABI, err := processor.abi.GetAbi(trace.Act.Account)
if err == nil {
if err = decode(ABI, trace.Act, &act.Data); err != nil {
if act.Data, err = trace.Act.Decode(ABI); err != nil {
logger.WithFields(log.Fields{
"contract": trace.Act.Account,
"action": trace.Act.Name,
@ -256,89 +246,104 @@ func (processor *ShipProcessor) proccessActionTrace(logger *log.Entry, trace *sh
}
// Callback function called by shipclient.Stream when a new block arrives.
func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) {
func (processor *ShipProcessor) processBlock(blockResult *ship.GetBlocksResultV0) {
block := ship.SignedBlock{}
blockResult.Block.Unpack(&block)
timestamp := block.BlockHeader.Timestamp.Time().UTC()
blockNumber := blockResult.ThisBlock.BlockNum
// Check to see if we have a microfork and post a message to
// the rollback channel in that case.
if processor.state.CurrentBlock > 0 && block.ThisBlock.BlockNum < processor.state.CurrentBlock {
if processor.state.CurrentBlock > 0 && blockNumber < processor.state.CurrentBlock {
log.WithField("old_block", processor.state.CurrentBlock).
WithField("new_block", block.ThisBlock.BlockNum).
WithField("new_block", blockResult.ThisBlock.BlockNum).
Warn("Fork detected, old_block is greater than new_block")
processor.encodeQueue(api.RollbackChannel, message.RollbackMessage{
OldBlockNum: processor.state.CurrentBlock,
NewBlockNum: block.ThisBlock.BlockNum,
NewBlockNum: blockResult.ThisBlock.BlockNum,
})
}
processor.state.CurrentBlock = block.ThisBlock.BlockNum
processor.state.CurrentBlock = blockNumber
if block.ThisBlock.BlockNum%100 == 0 {
log.Infof("Current: %d, Head: %d", processor.state.CurrentBlock, block.Head.BlockNum)
if blockResult.ThisBlock.BlockNum%100 == 0 {
log.Infof("Current: %d, Head: %d", processor.state.CurrentBlock, blockResult.Head.BlockNum)
}
if block.ThisBlock.BlockNum%10 == 0 {
if blockResult.ThisBlock.BlockNum%10 == 0 {
hb := message.HeartBeat{
BlockNum: block.ThisBlock.BlockNum,
LastIrreversibleBlockNum: block.LastIrreversible.BlockNum,
HeadBlockNum: block.Head.BlockNum,
BlockNum: blockNumber,
LastIrreversibleBlockNum: blockResult.LastIrreversible.BlockNum,
HeadBlockNum: blockResult.Head.BlockNum,
}
processor.encodeQueue(api.HeartbeatChannel, hb)
}
mainLogger := log.WithField("block", block.ThisBlock.BlockNum).Dup()
mainLogger := log.WithField("block", blockNumber).Dup()
// Process traces
if block.Traces != nil && len(block.Traces.Elem) > 0 {
for _, trace := range block.Traces.AsTransactionTracesV0() {
processor.processTransactionTrace(mainLogger, block.Block, trace)
if blockResult.Traces != nil {
unpacked := []ship.TransactionTrace{}
if err := blockResult.Traces.Unpack(&unpacked); err != nil {
mainLogger.WithError(err).Error("Failed to unpack transaction traces")
} else {
for _, trace := range unpacked {
processor.processTransactionTrace(mainLogger, blockNumber, &block, trace.V0)
}
}
}
// Process deltas
for _, delta := range block.Deltas.AsTableDeltasV0() {
deltas := []ship.TableDelta{}
if err := blockResult.Deltas.Unpack(&deltas); err != nil {
mainLogger.WithError(err).Error("Failed to unpack table deltas")
} else {
for _, delta := range deltas {
logger := mainLogger.WithField("type", "table_delta").WithField("table", delta.Name).Dup()
logger := mainLogger.WithField("type", "table_delta").Dup()
rows := []message.TableDeltaRow{}
for _, row := range delta.Rows {
rows := []message.TableDeltaRow{}
for _, row := range delta.V0.Rows {
msg := message.TableDeltaRow{
Present: row.Present,
RawData: row.Data,
}
msg := message.TableDeltaRow{
Present: row.Present,
RawData: row.Data,
}
if processor.shipABI != nil {
v, err := processor.shipABI.DecodeTableRowTyped(delta.Name, row.Data)
if err == nil {
err = json.Unmarshal(v, &msg.Data)
if err != nil {
logger.WithError(err).Error("Failed to decode json")
if processor.shipABI != nil {
v, err := processor.shipABI.Decode(bytes.NewReader(row.Data), delta.V0.Name)
if err == nil {
var ok bool
if msg.Data, ok = v.(map[string]any); !ok {
// logger.Error("Failed to cast table data")
}
} else {
logger.Error("Failed to decode table delta")
}
} else {
logger.Error("Failed to decode table delta")
logger.Warn("No SHIP ABI present")
}
} else {
logger.Warn("No SHIP ABI present")
rows = append(rows, msg)
}
rows = append(rows, msg)
}
message := message.TableDelta{
BlockNum: blockNumber,
Timestamp: timestamp,
Name: delta.V0.Name,
Rows: rows,
}
message := message.TableDelta{
BlockNum: block.Block.BlockNumber(),
Timestamp: block.Block.Timestamp.UTC(),
Name: delta.Name,
Rows: rows,
}
channels := []api.Channel{
api.TableDeltaChannel{}.Channel(),
api.TableDeltaChannel{Name: delta.V0.Name}.Channel(),
}
channels := []api.Channel{
api.TableDeltaChannel{}.Channel(),
api.TableDeltaChannel{Name: delta.Name}.Channel(),
}
for _, channel := range channels {
processor.encodeQueue(channel, message)
for _, channel := range channels {
processor.encodeQueue(channel, message)
}
}
}