1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-16 04:24:56 +02:00

ship_processor.go: refactor to a struct to get rid of global state.

This commit is contained in:
Henrik Hautakoski 2023-01-17 20:57:09 +01:00
parent 30e3d0e012
commit 2d369adabf
2 changed files with 31 additions and 32 deletions

30
main.go
View file

@ -33,12 +33,6 @@ var conf config.Config
var shClient *shipclient.ShipClient
var abi_mgr *abi.AbiManager
var publisher transport.Publisher
var redisNs transport.Namespace
// Reader states
const (
RS_CONNECT = 1
@ -225,9 +219,6 @@ func main() {
return
}
// Setup publisher
publisher = redis_pubsub.New(rdb)
// Connect client and get chain info.
log.Printf("Get chain info from api at: %s", conf.Api)
eosClient := eos.New(conf.Api)
@ -237,14 +228,6 @@ func main() {
return
}
// Init Abi cache
abi_mgr = abi.NewAbiManager(rdb, eosClient, conf.Redis.CacheID)
redisNs = transport.Namespace{
Prefix: conf.Redis.Prefix,
ChainID: chainInfo.ChainID.String(),
}
if conf.StartBlockNum == config.NULL_BLOCK_NUMBER {
if conf.IrreversibleOnly {
conf.StartBlockNum = uint32(chainInfo.LastIrreversibleBlockNum)
@ -253,10 +236,19 @@ func main() {
}
}
reader := ShipReader{
ns: transport.Namespace{
Prefix: conf.Redis.Prefix,
ChainID: chainInfo.ChainID.String(),
},
publisher: redis_pubsub.New(rdb),
abi: abi.NewAbiManager(rdb, eosClient, conf.Redis.CacheID),
}
// Construct ship client
shClient = shipclient.NewClient(conf.StartBlockNum, conf.EndBlockNum, conf.IrreversibleOnly)
shClient.BlockHandler = processBlock
shClient.TraceHandler = processTraces
shClient.BlockHandler = reader.processBlock
shClient.TraceHandler = reader.processTraces
// Run the application
run()

View file

@ -6,11 +6,18 @@ import (
log "github.com/sirupsen/logrus"
"eosio-ship-trace-reader/abi"
"eosio-ship-trace-reader/transport"
"github.com/eoscanada/eos-go"
"github.com/eoscanada/eos-go/ship"
)
type ShipReader struct {
ns transport.Namespace
abi *abi.AbiManager
publisher transport.Publisher
}
func decodeAction(abi *eos.ABI, data []byte, actionName eos.ActionName) (interface{}, error) {
var v interface{}
@ -34,9 +41,9 @@ func encodeMessage(v interface{}) ([]byte, bool) {
return payload, true
}
func queueMessage(channel transport.ChannelInterface, payload []byte) bool {
key := redisNs.NewKey(channel)
err := publisher.Publish(key.String(), payload)
func (reader *ShipReader) queueMessage(channel transport.ChannelInterface, payload []byte) bool {
key := reader.ns.NewKey(channel)
err := reader.publisher.Publish(key.String(), payload)
if err != nil {
log.WithError(err).Errorf("Failed to post to channel '%s'", key)
return false
@ -44,16 +51,16 @@ func queueMessage(channel transport.ChannelInterface, payload []byte) bool {
return true
}
func encodeQueue(channel transport.ChannelInterface, v interface{}) bool {
func (reader *ShipReader) encodeQueue(channel transport.ChannelInterface, v interface{}) bool {
if payload, ok := encodeMessage(v); ok {
if queueMessage(channel, payload) {
if reader.queueMessage(channel, payload) {
return true
}
}
return false
}
func processBlock(block *ship.GetBlocksResultV0) {
func (reader *ShipReader) processBlock(block *ship.GetBlocksResultV0) {
if block.ThisBlock.BlockNum%100 == 0 {
log.Infof("Current: %d, Head: %d\n", block.ThisBlock.BlockNum, block.Head.BlockNum)
}
@ -65,19 +72,19 @@ func processBlock(block *ship.GetBlocksResultV0) {
HeadBlockNum: block.Head.BlockNum,
}
encodeQueue(transport.HeartbeatChannel, hb)
reader.encodeQueue(transport.HeartbeatChannel, hb)
err := publisher.Flush()
err := reader.publisher.Flush()
if err != nil {
log.WithError(err).Error("Failed to send messages")
}
}
}
func processTraces(traces []*ship.TransactionTraceV0) {
func (reader *ShipReader) processTraces(traces []*ship.TransactionTraceV0) {
for _, trace := range traces {
encodeQueue(transport.TransactionChannel, trace)
reader.encodeQueue(transport.TransactionChannel, trace)
// Actions
for _, actionTraceVar := range trace.ActionTraces {
@ -91,7 +98,7 @@ func processTraces(traces []*ship.TransactionTraceV0) {
HexData: hex.EncodeToString(act_trace.Act.Data),
}
abi, err := abi_mgr.GetAbi(act_trace.Act.Account)
abi, err := reader.abi.GetAbi(act_trace.Act.Account)
if err == nil {
v, err := decodeAction(abi, act_trace.Act.Data, act_trace.Act.Name)
if err != nil {
@ -115,12 +122,12 @@ func processTraces(traces []*ship.TransactionTraceV0) {
}
for _, channel := range channels {
queueMessage(channel, payload)
reader.queueMessage(channel, payload)
}
}
}
err := publisher.Flush()
err := reader.publisher.Flush()
if err != nil {
log.WithError(err).Error("Failed to send messages")
}