From c8b4c208e646b9dd312e3a30097309890ffd9016 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Mon, 28 Nov 2022 15:22:07 +0100 Subject: [PATCH] WIP --- config/config.go | 2 ++ main.go | 9 +++++++ ship_processor.go | 20 +++++++------- transport/driver.go | 9 +++++++ transport/factory.go | 16 +++++++++++ transport/redis_pubsub.go | 35 ++++++++++++++++++++++++ transport/redis_stream.go | 57 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 137 insertions(+), 11 deletions(-) create mode 100644 transport/driver.go create mode 100644 transport/factory.go create mode 100644 transport/redis_pubsub.go create mode 100644 transport/redis_stream.go diff --git a/config/config.go b/config/config.go index f23c419..ac87071 100644 --- a/config/config.go +++ b/config/config.go @@ -24,6 +24,7 @@ type Config struct { Name string `json:"name"` ShipApi string `json:"ship_api"` Api string `json:"api"` + Transport string `json:"transport"` Redis RedisConfig `json:"redis"` @@ -38,6 +39,7 @@ type Config struct { func Load(filename string) (Config, error) { cfg := Config{ + Transport: "redis-channel", StartBlockNum: NULL_BLOCK_NUMBER, EndBlockNum: NULL_BLOCK_NUMBER, MaxMessagesInFlight: 10, diff --git a/main.go b/main.go index c9c4796..cba5616 100644 --- a/main.go +++ b/main.go @@ -13,6 +13,7 @@ import ( "eosio-ship-trace-reader/config" "eosio-ship-trace-reader/redis" "eosio-ship-trace-reader/telegram" + "eosio-ship-trace-reader/transport" eos "github.com/eoscanada/eos-go" shipclient "github.com/eosswedenorg-go/eos-ship-client" ) @@ -30,6 +31,7 @@ var shClient *shipclient.ShipClient var eosClient *eos.API var eosClientCtx = context.Background() +var transporter transport.Driver // Reader states const RS_CONNECT = 1 @@ -196,6 +198,13 @@ func main() { return } + // Create message queue + transporter, err = transport.Make(conf.Transport, chainInfo.ChainID.String()) + if err != nil { + log.Println("Failed to create queue:", err) + return + } + redis.Prefix += chainInfo.ChainID.String() + "." if conf.StartBlockNum == config.NULL_BLOCK_NUMBER { diff --git a/ship_processor.go b/ship_processor.go index bb1b505..3f4a14e 100644 --- a/ship_processor.go +++ b/ship_processor.go @@ -5,7 +5,6 @@ import ( "log" "encoding/json" "github.com/eoscanada/eos-go/ship" - "eosio-ship-trace-reader/redis" ) var block_num uint32 @@ -25,9 +24,8 @@ func processTraces(traces []*ship.TransactionTraceV0) { payload, err := json.Marshal(trace) if err == nil { - channel := redis.Key("transactions") - if err := redis.Publish(channel, payload).Err(); err != nil { - log.Printf("Failed to post to channel '%s': %s", channel, err) + if err := transporter.Send("transactions", block_num, payload); err != nil { + log.Println(err) } } else { log.Println("Failed to encode transaction:", err) @@ -62,21 +60,21 @@ func processTraces(traces []*ship.TransactionTraceV0) { } channels := []string{ - redis.Key("actions"), - redis.Key(string(act.Contract), "actions"), - redis.Key(string(act.Contract), "actions", string(act.Action)), + "actions", + string(act.Contract) + ".actions", + string(act.Contract) + ".actions." + string(act.Action), } for _, channel := range channels { - if err := redis.RegisterPublish(channel, payload).Err(); err != nil { - log.Printf("Failed to post to channel '%s': %s", channel, err) + if err := transporter.Send(channel, block_num, payload); err != nil { + log.Println(err) } } } } - _, err := redis.Send() + err := transporter.Commit() if err != nil { - log.Println("Failed to send redis. command:", err) + log.Println("Failed to flush queue", err) } } diff --git a/transport/driver.go b/transport/driver.go new file mode 100644 index 0000000..35e2cf0 --- /dev/null +++ b/transport/driver.go @@ -0,0 +1,9 @@ + +package transport + +type Driver interface +{ + Send(namespace string, id uint32, message interface{}) error + + Commit() error +} diff --git a/transport/factory.go b/transport/factory.go new file mode 100644 index 0000000..f068af6 --- /dev/null +++ b/transport/factory.go @@ -0,0 +1,16 @@ + +package transport + +import "fmt" + +func Make(driver string, name string) (Driver, error) { + + switch driver { + case "redis-pubsub": + return NewRedisPubSub(name), nil + case "redis-stream": + return NewRedisStream(name, 1000), nil + default: + return nil, fmt.Errorf("Invalid type: %s", driver) + } +} diff --git a/transport/redis_pubsub.go b/transport/redis_pubsub.go new file mode 100644 index 0000000..9d7794f --- /dev/null +++ b/transport/redis_pubsub.go @@ -0,0 +1,35 @@ + +package transport + +import ( + "fmt" + "strings" + "eosio-ship-trace-reader/redis" +) + +type RedisPubSub struct { + name string +} + +func NewRedisPubSub(name string) (RedisPubSub) { + return RedisPubSub{ + name: name, + } +} + +func (this RedisPubSub) Send(namespace string, id uint32, message interface{}) error { + + channel := strings.Join([]string{"ship.channel", this.name, namespace}, ".") + if err := redis.RegisterPublish(channel, message).Err(); err != nil { + return fmt.Errorf("Failed to post to channel '%s': %s", channel, err) + } + return nil +} + +func (this RedisPubSub) Commit() error { + _, err := redis.Send() + if err != nil { + return fmt.Errorf("Failed to send redis. command: %s", err) + } + return nil +} diff --git a/transport/redis_stream.go b/transport/redis_stream.go new file mode 100644 index 0000000..6900ec4 --- /dev/null +++ b/transport/redis_stream.go @@ -0,0 +1,57 @@ + +package transport + +import ( + "fmt" + "strings" + "eosio-ship-trace-reader/redis" +) + +type RedisStream struct { + name string + + // Length of the stream, if items are added when the stream is full, old items will be evicted + // until the stream's length is equal to this value. + length int64 + + // map of namespaces and their indexes. + // each namespace is it's own stream. + indexes map[string]uint32 +} + +func NewRedisStream(name string, length int64) (RedisStream) { + return RedisStream{ + name: name, + length: length, + indexes: make(map[string]uint32), + } +} + +func (this RedisStream) Send(namespace string, id uint32, message interface{}) error { + + stream := strings.Join([]string{"ship.stream", this.name, namespace}, ".") + index := this.nextIndex(namespace) + + data := map[string]interface{}{ + "block": id, + "data": message, + } + + if err := redis.XAdd(stream, fmt.Sprintf("%d-%d", id, index), this.length, data).Err(); err != nil { + return fmt.Errorf("Failed to add to redis stream '%s': %s", stream, err) + } + return nil +} + +func (this RedisStream) Commit() error { + + // reset indexes on flush. + this.indexes = make(map[string]uint32) + return nil +} + +func (this RedisStream) nextIndex(namespace string) uint32 { + idx := this.indexes[namespace] + this.indexes[namespace] = idx + 1 + return idx +}