diff --git a/main.go b/main.go index 494c62c..74bc4f4 100644 --- a/main.go +++ b/main.go @@ -33,12 +33,6 @@ var conf config.Config var shClient *shipclient.ShipClient -var abi_mgr *abi.AbiManager - -var publisher transport.Publisher - -var redisNs transport.Namespace - // Reader states const ( RS_CONNECT = 1 @@ -225,9 +219,6 @@ func main() { return } - // Setup publisher - publisher = redis_pubsub.New(rdb) - // Connect client and get chain info. log.Printf("Get chain info from api at: %s", conf.Api) eosClient := eos.New(conf.Api) @@ -237,14 +228,6 @@ func main() { return } - // Init Abi cache - abi_mgr = abi.NewAbiManager(rdb, eosClient, conf.Redis.CacheID) - - redisNs = transport.Namespace{ - Prefix: conf.Redis.Prefix, - ChainID: chainInfo.ChainID.String(), - } - if conf.StartBlockNum == config.NULL_BLOCK_NUMBER { if conf.IrreversibleOnly { conf.StartBlockNum = uint32(chainInfo.LastIrreversibleBlockNum) @@ -253,10 +236,19 @@ func main() { } } + reader := ShipReader{ + ns: transport.Namespace{ + Prefix: conf.Redis.Prefix, + ChainID: chainInfo.ChainID.String(), + }, + publisher: redis_pubsub.New(rdb), + abi: abi.NewAbiManager(rdb, eosClient, conf.Redis.CacheID), + } + // Construct ship client shClient = shipclient.NewClient(conf.StartBlockNum, conf.EndBlockNum, conf.IrreversibleOnly) - shClient.BlockHandler = processBlock - shClient.TraceHandler = processTraces + shClient.BlockHandler = reader.processBlock + shClient.TraceHandler = reader.processTraces // Run the application run() diff --git a/ship_processor.go b/ship_processor.go index 5073868..bc19651 100644 --- a/ship_processor.go +++ b/ship_processor.go @@ -6,11 +6,18 @@ import ( log "github.com/sirupsen/logrus" + "eosio-ship-trace-reader/abi" "eosio-ship-trace-reader/transport" "github.com/eoscanada/eos-go" "github.com/eoscanada/eos-go/ship" ) +type ShipReader struct { + ns transport.Namespace + abi *abi.AbiManager + publisher transport.Publisher +} + func decodeAction(abi *eos.ABI, data []byte, actionName eos.ActionName) (interface{}, error) { var v interface{} @@ -34,9 +41,9 @@ func encodeMessage(v interface{}) ([]byte, bool) { return payload, true } -func queueMessage(channel transport.ChannelInterface, payload []byte) bool { - key := redisNs.NewKey(channel) - err := publisher.Publish(key.String(), payload) +func (reader *ShipReader) queueMessage(channel transport.ChannelInterface, payload []byte) bool { + key := reader.ns.NewKey(channel) + err := reader.publisher.Publish(key.String(), payload) if err != nil { log.WithError(err).Errorf("Failed to post to channel '%s'", key) return false @@ -44,16 +51,16 @@ func queueMessage(channel transport.ChannelInterface, payload []byte) bool { return true } -func encodeQueue(channel transport.ChannelInterface, v interface{}) bool { +func (reader *ShipReader) encodeQueue(channel transport.ChannelInterface, v interface{}) bool { if payload, ok := encodeMessage(v); ok { - if queueMessage(channel, payload) { + if reader.queueMessage(channel, payload) { return true } } return false } -func processBlock(block *ship.GetBlocksResultV0) { +func (reader *ShipReader) processBlock(block *ship.GetBlocksResultV0) { if block.ThisBlock.BlockNum%100 == 0 { log.Infof("Current: %d, Head: %d\n", block.ThisBlock.BlockNum, block.Head.BlockNum) } @@ -65,19 +72,19 @@ func processBlock(block *ship.GetBlocksResultV0) { HeadBlockNum: block.Head.BlockNum, } - encodeQueue(transport.HeartbeatChannel, hb) + reader.encodeQueue(transport.HeartbeatChannel, hb) - err := publisher.Flush() + err := reader.publisher.Flush() if err != nil { log.WithError(err).Error("Failed to send messages") } } } -func processTraces(traces []*ship.TransactionTraceV0) { +func (reader *ShipReader) processTraces(traces []*ship.TransactionTraceV0) { for _, trace := range traces { - encodeQueue(transport.TransactionChannel, trace) + reader.encodeQueue(transport.TransactionChannel, trace) // Actions for _, actionTraceVar := range trace.ActionTraces { @@ -91,7 +98,7 @@ func processTraces(traces []*ship.TransactionTraceV0) { HexData: hex.EncodeToString(act_trace.Act.Data), } - abi, err := abi_mgr.GetAbi(act_trace.Act.Account) + abi, err := reader.abi.GetAbi(act_trace.Act.Account) if err == nil { v, err := decodeAction(abi, act_trace.Act.Data, act_trace.Act.Name) if err != nil { @@ -115,12 +122,12 @@ func processTraces(traces []*ship.TransactionTraceV0) { } for _, channel := range channels { - queueMessage(channel, payload) + reader.queueMessage(channel, payload) } } } - err := publisher.Flush() + err := reader.publisher.Flush() if err != nil { log.WithError(err).Error("Failed to send messages") }