diff --git a/internal/redis/wrapper.go b/internal/redis/wrapper.go index 3e0f7c1..d109fb1 100644 --- a/internal/redis/wrapper.go +++ b/internal/redis/wrapper.go @@ -2,7 +2,6 @@ package redis import ( "context" - "strings" "time" _redis "github.com/go-redis/redis/v8" @@ -14,8 +13,6 @@ var redis_pipe _redis.Pipeliner var redisCtx = context.Background() -var prefix []string - func Connect(addr string, password string, db int) error { rdb = _redis.NewClient(&_redis.Options{ Addr: addr, @@ -32,16 +29,6 @@ func Client() *_redis.Client { return rdb } -func SetPrefix(components ...string) { - prefix = components -} - -func Key(components ...string) string { - namespace := strings.Join(prefix, ".") - channel := strings.Join(components, ".") - return namespace + "::" + channel -} - func Get(key string) *_redis.StringCmd { return rdb.Get(redisCtx, key) } diff --git a/main.go b/main.go index 1ad7e39..9e1a2bf 100644 --- a/main.go +++ b/main.go @@ -35,6 +35,8 @@ var ( eosClientCtx = context.Background() ) +var redisNs redis.Namespace + // Reader states const ( RS_CONNECT = 1 @@ -226,7 +228,10 @@ func main() { return } - redis.SetPrefix(conf.Redis.Prefix, chainInfo.ChainID.String()) + redisNs = redis.Namespace{ + Prefix: conf.Redis.Prefix, + ChainID: chainInfo.ChainID.String(), + } if conf.StartBlockNum == config.NULL_BLOCK_NUMBER { if conf.IrreversibleOnly { diff --git a/ship_processor.go b/ship_processor.go index 8158260..27093d2 100644 --- a/ship_processor.go +++ b/ship_processor.go @@ -21,18 +21,18 @@ func encodeMessage(v interface{}) ([]byte, bool) { return payload, true } -func queueMessage(channel string, payload []byte) bool { - err := redis.RegisterPublish(channel, payload).Err() +func queueMessage(key redis.Key, payload []byte) bool { + err := redis.RegisterPublish(key.String(), payload).Err() if err != nil { - log.WithError(err).Errorf("Failed to post to channel '%s'", channel) + log.WithError(err).Errorf("Failed to post to channel '%s'", key) return false } return true } -func encodeQueue(key string, v interface{}) bool { +func encodeQueue(channel redis.Channel, v interface{}) bool { if payload, ok := encodeMessage(v); ok { - channel := redis.Key(key) + channel := redis.Key{NS: redisNs, Channel: channel} if queueMessage(channel, payload) { return true } @@ -52,7 +52,7 @@ func processBlock(block *ship.GetBlocksResultV0) { HeadBlockNum: block.Head.BlockNum, } - encodeQueue("heartbeat", hb) + encodeQueue(redis.HeartbeatChannel, hb) _, err := redis.Send() if err != nil { @@ -64,7 +64,7 @@ func processBlock(block *ship.GetBlocksResultV0) { func processTraces(traces []*ship.TransactionTraceV0) { for _, trace := range traces { - encodeQueue("transactions", trace) + encodeQueue(redis.TransactionChannel, trace) // Actions for _, actionTraceVar := range trace.ActionTraces { @@ -94,11 +94,11 @@ func processTraces(traces []*ship.TransactionTraceV0) { continue } - channels := []string{ - redis.Key("actions"), - redis.Key("actions", "action:"+string(act.Action)), - redis.Key("actions", "contract:"+string(act.Contract)), - redis.Key("actions", "contract:"+string(act.Contract), "action:"+string(act.Action)), + channels := []redis.Key{ + {NS: redisNs, Channel: redis.ActionChannel{}}, + {NS: redisNs, Channel: redis.ActionChannel{Action: string(act.Action)}}, + {NS: redisNs, Channel: redis.ActionChannel{Contract: string(act.Contract)}}, + {NS: redisNs, Channel: redis.ActionChannel{Action: string(act.Action), Contract: string(act.Contract)}}, } for _, channel := range channels {