From 3ecf4c7c421a0ff7571cec466fada059b3fa7fbd Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Tue, 29 Nov 2022 14:51:51 +0100 Subject: [PATCH] ship_processor.go: move some repetitive code into helper functions. --- ship_processor.go | 49 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/ship_processor.go b/ship_processor.go index e613b38..78331d3 100644 --- a/ship_processor.go +++ b/ship_processor.go @@ -10,6 +10,36 @@ import ( "github.com/eoscanada/eos-go/ship" ) +func encodeMessage(v interface{}) ([]byte, bool) { + payload, err := json.Marshal(v) + if err != nil { + log.WithError(err). + WithField("v", v). + Warn("Failed to encode message to json") + return nil, false + } + return payload, true +} + +func queueMessage(channel string, payload []byte) bool { + err := redis.RegisterPublish(channel, payload).Err() + if err != nil { + log.WithError(err).Errorf("Failed to post to channel '%s'", channel) + return false + } + return true +} + +func encodeQueue(key string, v interface{}) bool { + if payload, ok := encodeMessage(v); ok { + channel := redis.Key(key) + if queueMessage(channel, payload) { + return true + } + } + return false +} + func processBlock(block *ship.GetBlocksResultV0) { if block.ThisBlock.BlockNum%100 == 0 { log.Infof("Current: %d, Head: %d\n", block.ThisBlock.BlockNum, block.Head.BlockNum) @@ -19,15 +49,7 @@ func processBlock(block *ship.GetBlocksResultV0) { func processTraces(traces []*ship.TransactionTraceV0) { for _, trace := range traces { - payload, err := json.Marshal(trace) - if err == nil { - channel := redis.Key("transactions") - if err := redis.Publish(channel, payload).Err(); err != nil { - log.WithError(err).Errorf("Failed to post to channel '%s'", channel) - } - } else { - log.WithError(err).Warn("Failed to encode transaction") - } + encodeQueue("transactions", trace) // Actions for _, actionTraceVar := range trace.ActionTraces { @@ -52,9 +74,8 @@ func processTraces(traces []*ship.TransactionTraceV0) { log.WithError(err).Errorf("Failed to get abi for contract %s", act_trace.Act.Account) } - payload, err := json.Marshal(act) - if err != nil { - log.WithError(err).Error("Failed to encode action") + payload, ok := encodeMessage(act) + if !ok { continue } @@ -65,9 +86,7 @@ func processTraces(traces []*ship.TransactionTraceV0) { } for _, channel := range channels { - if err := redis.RegisterPublish(channel, payload).Err(); err != nil { - log.WithError(err).Errorf("Failed to post to channel '%s'", channel) - } + queueMessage(channel, payload) } } }