1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-07-04 12:03:41 +02:00

app/ship_processor.go: Use an message.Encoder function instead of directly calling massage.Encode()

This commit is contained in:
Henrik Hautakoski 2023-02-10 16:44:32 +01:00
parent 2c3af6e4c5
commit 55f013a386

View file

@ -2,6 +2,7 @@ package app
import ( import (
"encoding/hex" "encoding/hex"
"encoding/json"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -12,10 +13,24 @@ import (
shipclient "github.com/eosswedenorg-go/antelope-ship-client" shipclient "github.com/eosswedenorg-go/antelope-ship-client"
) )
// logDecoratedEncoder decorates a message.Encoder and logs any error.
func logDecoratedEncoder(encoder message.Encoder) message.Encoder {
return func(v interface{}) ([]byte, error) {
payload, err := encoder(v)
if err != nil {
log.WithError(err).
WithField("v", v).
Warn("Failed to encode message")
}
return payload, err
}
}
type ShipProcessor struct { type ShipProcessor struct {
abi *abi.AbiManager abi *abi.AbiManager
publisher transport.Publisher publisher transport.Publisher
shClient *shipclient.Client shClient *shipclient.Client
encode message.Encoder
} }
func SpawnProccessor(shClient *shipclient.Client, publisher transport.Publisher, abi *abi.AbiManager) { func SpawnProccessor(shClient *shipclient.Client, publisher transport.Publisher, abi *abi.AbiManager) {
@ -23,6 +38,7 @@ func SpawnProccessor(shClient *shipclient.Client, publisher transport.Publisher,
abi: abi, abi: abi,
publisher: publisher, publisher: publisher,
shClient: shClient, shClient: shClient,
encode: logDecoratedEncoder(json.Marshal),
} }
// Attach handlers // Attach handlers
@ -40,10 +56,8 @@ func (processor *ShipProcessor) queueMessage(channel transport.ChannelInterface,
} }
func (processor *ShipProcessor) encodeQueue(channel transport.ChannelInterface, v interface{}) bool { func (processor *ShipProcessor) encodeQueue(channel transport.ChannelInterface, v interface{}) bool {
if payload, ok := message.Encode(v); ok { if payload, err := processor.encode(v); err != nil {
if processor.queueMessage(channel, payload) { return processor.queueMessage(channel, payload)
return true
}
} }
return false return false
} }
@ -97,8 +111,8 @@ func (processor *ShipProcessor) processTraces(traces []*ship.TransactionTraceV0)
log.WithError(err).Errorf("Failed to get abi for contract %s", act_trace.Act.Account) log.WithError(err).Errorf("Failed to get abi for contract %s", act_trace.Act.Account)
} }
payload, ok := message.Encode(act) payload, err := processor.encode(act)
if !ok { if err != nil {
continue continue
} }