diff --git a/main.go b/main.go index 74bc4f4..c3628c9 100644 --- a/main.go +++ b/main.go @@ -236,7 +236,7 @@ func main() { } } - reader := ShipReader{ + processor := ShipProcessor{ ns: transport.Namespace{ Prefix: conf.Redis.Prefix, ChainID: chainInfo.ChainID.String(), @@ -247,8 +247,8 @@ func main() { // Construct ship client shClient = shipclient.NewClient(conf.StartBlockNum, conf.EndBlockNum, conf.IrreversibleOnly) - shClient.BlockHandler = reader.processBlock - shClient.TraceHandler = reader.processTraces + shClient.BlockHandler = processor.processBlock + shClient.TraceHandler = processor.processTraces // Run the application run() diff --git a/ship_processor.go b/ship_processor.go index bd4c138..062667e 100644 --- a/ship_processor.go +++ b/ship_processor.go @@ -11,15 +11,15 @@ import ( "github.com/eoscanada/eos-go/ship" ) -type ShipReader struct { +type ShipProcessor struct { ns transport.Namespace abi *abi.AbiManager publisher transport.Publisher } -func (reader *ShipReader) queueMessage(channel transport.ChannelInterface, payload []byte) bool { - key := reader.ns.NewKey(channel) - err := reader.publisher.Publish(key.String(), payload) +func (processor *ShipProcessor) queueMessage(channel transport.ChannelInterface, payload []byte) bool { + key := processor.ns.NewKey(channel) + err := processor.publisher.Publish(key.String(), payload) if err != nil { log.WithError(err).Errorf("Failed to post to channel '%s'", key) return false @@ -27,16 +27,16 @@ func (reader *ShipReader) queueMessage(channel transport.ChannelInterface, paylo return true } -func (reader *ShipReader) encodeQueue(channel transport.ChannelInterface, v interface{}) bool { +func (processor *ShipProcessor) encodeQueue(channel transport.ChannelInterface, v interface{}) bool { if payload, ok := message.Encode(v); ok { - if reader.queueMessage(channel, payload) { + if processor.queueMessage(channel, payload) { return true } } return false } -func (reader *ShipReader) processBlock(block *ship.GetBlocksResultV0) { +func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { if block.ThisBlock.BlockNum%100 == 0 { log.Infof("Current: %d, Head: %d\n", block.ThisBlock.BlockNum, block.Head.BlockNum) } @@ -48,19 +48,19 @@ func (reader *ShipReader) processBlock(block *ship.GetBlocksResultV0) { HeadBlockNum: block.Head.BlockNum, } - reader.encodeQueue(transport.HeartbeatChannel, hb) + processor.encodeQueue(transport.HeartbeatChannel, hb) - err := reader.publisher.Flush() + err := processor.publisher.Flush() if err != nil { log.WithError(err).Error("Failed to send messages") } } } -func (reader *ShipReader) processTraces(traces []*ship.TransactionTraceV0) { +func (processor *ShipProcessor) processTraces(traces []*ship.TransactionTraceV0) { for _, trace := range traces { - reader.encodeQueue(transport.TransactionChannel, trace) + processor.encodeQueue(transport.TransactionChannel, trace) // Actions for _, actionTraceVar := range trace.ActionTraces { @@ -74,7 +74,7 @@ func (reader *ShipReader) processTraces(traces []*ship.TransactionTraceV0) { HexData: hex.EncodeToString(act_trace.Act.Data), } - ABI, err := reader.abi.GetAbi(act_trace.Act.Account) + ABI, err := processor.abi.GetAbi(act_trace.Act.Account) if err == nil { v, err := abi.DecodeAction(ABI, act_trace.Act.Data, act_trace.Act.Name) if err != nil { @@ -98,12 +98,12 @@ func (reader *ShipReader) processTraces(traces []*ship.TransactionTraceV0) { } for _, channel := range channels { - reader.queueMessage(channel, payload) + processor.queueMessage(channel, payload) } } } - err := reader.publisher.Flush() + err := processor.publisher.Flush() if err != nil { log.WithError(err).Error("Failed to send messages") }