diff --git a/abi.go b/abi.go index f89892c..ff46037 100644 --- a/abi.go +++ b/abi.go @@ -9,6 +9,7 @@ import ( eos "github.com/eoscanada/eos-go" redis_cache "github.com/go-redis/cache/v8" "eosio-ship-trace-reader/abi_cache" + "eosio-ship-trace-reader/redis" ) var abiCache *abi_cache.Cache @@ -16,7 +17,7 @@ var abiCache *abi_cache.Cache func InitAbiCache(id string) { // Init abi cache abiCache = abi_cache.New("ship.cache." + id + ".abi", &redis_cache.Options{ - Redis: rdb, + Redis: redis.Client(), // Cache 10k keys for 10 minutes. LocalCache: redis_cache.NewTinyLFU(10000, 10 * time.Minute), }) diff --git a/main.go b/main.go index 14506f7..e16c7d3 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,7 @@ import ( "github.com/pborman/getopt/v2" "github.com/eosswedenorg-go/pid" "eosio-ship-trace-reader/config" + "eosio-ship-trace-reader/redis" eos "github.com/eoscanada/eos-go" shipclient "github.com/eosswedenorg-go/eos-ship-client" ) @@ -157,7 +158,7 @@ func main() { } // Connect to redis - err = RedisConnect(conf.Redis.Addr, conf.Redis.Password, conf.Redis.DB) + err = redis.Connect(conf.Redis.Addr, conf.Redis.Password, conf.Redis.DB) if err != nil { log.Println("Failed to connect to redis:", err) return @@ -175,7 +176,7 @@ func main() { return } - redisPrefix += chainInfo.ChainID.String() + "." + redis.Prefix += chainInfo.ChainID.String() + "." if conf.StartBlockNum == config.NULL_BLOCK_NUMBER { diff --git a/redis.go b/redis.go deleted file mode 100644 index 2145415..0000000 --- a/redis.go +++ /dev/null @@ -1,53 +0,0 @@ - -package main - -import ( - "strings" - "context" - "time" - "github.com/go-redis/redis/v8" -) - -var rdb *redis.Client - -var redis_pipe redis.Pipeliner - -var redisCtx = context.Background() - -var redisPrefix = "ship." - -func RedisConnect(addr string, password string, db int) error { - rdb = redis.NewClient(&redis.Options{ - Addr: addr, - Password: password, - DB: db, - }) - - redis_pipe = rdb.Pipeline() - - return rdb.Ping(redisCtx).Err() -} - -func RedisKey(components ...string) (string) { - return redisPrefix + strings.Join(components, ".") -} - -func RedisGet(key string) (*redis.StringCmd) { - return rdb.Get(redisCtx, key) -} - -func RedisSet(key string, value interface{}, expiration time.Duration) (*redis.StatusCmd) { - return rdb.Set(redisCtx, key, value, expiration) -} - -func RedisPublish(channel string, message interface{}) (*redis.IntCmd) { - return rdb.Publish(redisCtx, channel, message) -} - -func RedisRegisterPublish(channel string, message interface{}) (*redis.IntCmd) { - return redis_pipe.Publish(redisCtx, channel, message) -} - -func RedisSend() ([]redis.Cmder, error) { - return redis_pipe.Exec(redisCtx) -} diff --git a/redis/wrapper.go b/redis/wrapper.go new file mode 100644 index 0000000..46c8ed3 --- /dev/null +++ b/redis/wrapper.go @@ -0,0 +1,57 @@ + +package redis + +import ( + "strings" + "context" + "time" + _redis "github.com/go-redis/redis/v8" +) + +var rdb *_redis.Client + +var redis_pipe _redis.Pipeliner + +var redisCtx = context.Background() + +var Prefix = "ship." + +func Connect(addr string, password string, db int) error { + rdb = _redis.NewClient(&_redis.Options{ + Addr: addr, + Password: password, + DB: db, + }) + + redis_pipe = rdb.Pipeline() + + return rdb.Ping(redisCtx).Err() +} + +func Client() *_redis.Client { + return rdb +} + +func Key(components ...string) (string) { + return Prefix + strings.Join(components, ".") +} + +func Get(key string) (*_redis.StringCmd) { + return rdb.Get(redisCtx, key) +} + +func Set(key string, value interface{}, expiration time.Duration) (*_redis.StatusCmd) { + return rdb.Set(redisCtx, key, value, expiration) +} + +func Publish(channel string, message interface{}) (*_redis.IntCmd) { + return rdb.Publish(redisCtx, channel, message) +} + +func RegisterPublish(channel string, message interface{}) (*_redis.IntCmd) { + return redis_pipe.Publish(redisCtx, channel, message) +} + +func Send() ([]_redis.Cmder, error) { + return redis_pipe.Exec(redisCtx) +} diff --git a/ship_processor.go b/ship_processor.go index 83b205c..ade11e7 100644 --- a/ship_processor.go +++ b/ship_processor.go @@ -5,6 +5,7 @@ import ( "log" "encoding/json" "github.com/eoscanada/eos-go/ship" + "eosio-ship-trace-reader/redis" ) func processBlock(block *ship.GetBlocksResultV0) { @@ -20,8 +21,8 @@ func processTraces(traces []*ship.TransactionTraceV0) { payload, err := json.Marshal(trace) if err == nil { - channel := RedisKey("transactions") - if err := RedisPublish(channel, payload).Err(); err != nil { + channel := redis.Key("transactions") + if err := redis.Publish(channel, payload).Err(); err != nil { log.Printf("Failed to post to channel '%s': %s", channel, err) } } else { @@ -57,21 +58,21 @@ func processTraces(traces []*ship.TransactionTraceV0) { } channels := []string{ - RedisKey("actions"), - RedisKey(string(act.Contract), "actions"), - RedisKey(string(act.Contract), "actions", string(act.Action)), + redis.Key("actions"), + redis.Key(string(act.Contract), "actions"), + redis.Key(string(act.Contract), "actions", string(act.Action)), } for _, channel := range channels { - if err := RedisRegisterPublish(channel, payload).Err(); err != nil { + if err := redis.RegisterPublish(channel, payload).Err(); err != nil { log.Printf("Failed to post to channel '%s': %s", channel, err) } } } } - _, err := RedisSend() + _, err := redis.Send() if err != nil { - log.Println("Failed to send redis command:", err) + log.Println("Failed to send redis. command:", err) } }