mirror of
https://github.com/eosswedenorg/thalos
synced 2026-07-03 11:53:41 +02:00
Refactor to use the new redis.Key,redis.Namespace and redis.Channel types.
This commit is contained in:
parent
61beca435c
commit
13c8ed7692
3 changed files with 18 additions and 26 deletions
|
|
@ -2,7 +2,6 @@ package redis
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
_redis "github.com/go-redis/redis/v8"
|
_redis "github.com/go-redis/redis/v8"
|
||||||
|
|
@ -14,8 +13,6 @@ var redis_pipe _redis.Pipeliner
|
||||||
|
|
||||||
var redisCtx = context.Background()
|
var redisCtx = context.Background()
|
||||||
|
|
||||||
var prefix []string
|
|
||||||
|
|
||||||
func Connect(addr string, password string, db int) error {
|
func Connect(addr string, password string, db int) error {
|
||||||
rdb = _redis.NewClient(&_redis.Options{
|
rdb = _redis.NewClient(&_redis.Options{
|
||||||
Addr: addr,
|
Addr: addr,
|
||||||
|
|
@ -32,16 +29,6 @@ func Client() *_redis.Client {
|
||||||
return rdb
|
return rdb
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetPrefix(components ...string) {
|
|
||||||
prefix = components
|
|
||||||
}
|
|
||||||
|
|
||||||
func Key(components ...string) string {
|
|
||||||
namespace := strings.Join(prefix, ".")
|
|
||||||
channel := strings.Join(components, ".")
|
|
||||||
return namespace + "::" + channel
|
|
||||||
}
|
|
||||||
|
|
||||||
func Get(key string) *_redis.StringCmd {
|
func Get(key string) *_redis.StringCmd {
|
||||||
return rdb.Get(redisCtx, key)
|
return rdb.Get(redisCtx, key)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
7
main.go
7
main.go
|
|
@ -35,6 +35,8 @@ var (
|
||||||
eosClientCtx = context.Background()
|
eosClientCtx = context.Background()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var redisNs redis.Namespace
|
||||||
|
|
||||||
// Reader states
|
// Reader states
|
||||||
const (
|
const (
|
||||||
RS_CONNECT = 1
|
RS_CONNECT = 1
|
||||||
|
|
@ -226,7 +228,10 @@ func main() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
redis.SetPrefix(conf.Redis.Prefix, chainInfo.ChainID.String())
|
redisNs = redis.Namespace{
|
||||||
|
Prefix: conf.Redis.Prefix,
|
||||||
|
ChainID: chainInfo.ChainID.String(),
|
||||||
|
}
|
||||||
|
|
||||||
if conf.StartBlockNum == config.NULL_BLOCK_NUMBER {
|
if conf.StartBlockNum == config.NULL_BLOCK_NUMBER {
|
||||||
if conf.IrreversibleOnly {
|
if conf.IrreversibleOnly {
|
||||||
|
|
|
||||||
|
|
@ -21,18 +21,18 @@ func encodeMessage(v interface{}) ([]byte, bool) {
|
||||||
return payload, true
|
return payload, true
|
||||||
}
|
}
|
||||||
|
|
||||||
func queueMessage(channel string, payload []byte) bool {
|
func queueMessage(key redis.Key, payload []byte) bool {
|
||||||
err := redis.RegisterPublish(channel, payload).Err()
|
err := redis.RegisterPublish(key.String(), payload).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Errorf("Failed to post to channel '%s'", channel)
|
log.WithError(err).Errorf("Failed to post to channel '%s'", key)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func encodeQueue(key string, v interface{}) bool {
|
func encodeQueue(channel redis.Channel, v interface{}) bool {
|
||||||
if payload, ok := encodeMessage(v); ok {
|
if payload, ok := encodeMessage(v); ok {
|
||||||
channel := redis.Key(key)
|
channel := redis.Key{NS: redisNs, Channel: channel}
|
||||||
if queueMessage(channel, payload) {
|
if queueMessage(channel, payload) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
@ -52,7 +52,7 @@ func processBlock(block *ship.GetBlocksResultV0) {
|
||||||
HeadBlockNum: block.Head.BlockNum,
|
HeadBlockNum: block.Head.BlockNum,
|
||||||
}
|
}
|
||||||
|
|
||||||
encodeQueue("heartbeat", hb)
|
encodeQueue(redis.HeartbeatChannel, hb)
|
||||||
|
|
||||||
_, err := redis.Send()
|
_, err := redis.Send()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -64,7 +64,7 @@ func processBlock(block *ship.GetBlocksResultV0) {
|
||||||
func processTraces(traces []*ship.TransactionTraceV0) {
|
func processTraces(traces []*ship.TransactionTraceV0) {
|
||||||
for _, trace := range traces {
|
for _, trace := range traces {
|
||||||
|
|
||||||
encodeQueue("transactions", trace)
|
encodeQueue(redis.TransactionChannel, trace)
|
||||||
|
|
||||||
// Actions
|
// Actions
|
||||||
for _, actionTraceVar := range trace.ActionTraces {
|
for _, actionTraceVar := range trace.ActionTraces {
|
||||||
|
|
@ -94,11 +94,11 @@ func processTraces(traces []*ship.TransactionTraceV0) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
channels := []string{
|
channels := []redis.Key{
|
||||||
redis.Key("actions"),
|
{NS: redisNs, Channel: redis.ActionChannel{}},
|
||||||
redis.Key("actions", "action:"+string(act.Action)),
|
{NS: redisNs, Channel: redis.ActionChannel{Action: string(act.Action)}},
|
||||||
redis.Key("actions", "contract:"+string(act.Contract)),
|
{NS: redisNs, Channel: redis.ActionChannel{Contract: string(act.Contract)}},
|
||||||
redis.Key("actions", "contract:"+string(act.Contract), "action:"+string(act.Action)),
|
{NS: redisNs, Channel: redis.ActionChannel{Action: string(act.Action), Contract: string(act.Contract)}},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, channel := range channels {
|
for _, channel := range channels {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue