diff --git a/go.mod b/go.mod index 56f2582..82d6bd6 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module eosio-ship-trace-reader go 1.18 require ( + github.com/alicebob/miniredis/v2 v2.30.0 github.com/eoscanada/eos-go v0.10.3-0.20221117104514-64cafd714c60 github.com/eosswedenorg-go/antelope-ship-client v0.2.2 github.com/eosswedenorg-go/pid v1.0.1 @@ -16,6 +17,7 @@ require ( ) require ( + github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect github.com/blendle/zapdriver v1.3.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -37,6 +39,7 @@ require ( github.com/vmihailenco/go-tinylfu v0.2.2 // indirect github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/goleak v1.2.0 // indirect go.uber.org/multierr v1.8.0 // indirect diff --git a/go.sum b/go.sum index 182eb1c..6dec914 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,10 @@ cloud.google.com/go v0.43.0/go.mod h1:BOSR3VbTLkk6FDC/TcffxP4NF/FFBGA5ku+jvKOP7p contrib.go.opencensus.io/exporter/stackdriver v0.12.6/go.mod h1:8x999/OcIPy5ivx/wDiV7Gx4D+VUPODf0mWRGRc5kSk= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis/v2 v2.30.0 h1:uA3uhDbCxfO9+DI/DuGeAMr9qI+noVWwGPNTFuKID5M= +github.com/alicebob/miniredis/v2 v2.30.0/go.mod h1:84TWKZlxYkfgMucPBf5SOQBYJceZeQRFIaQgNMiCX6Q= github.com/aws/aws-sdk-go v1.22.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -171,6 +175,8 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64 h1:5mLPGnFdSsevFRFc9q3yYbBkB6tsm4aCwwQV/j1JQAQ= +github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA= @@ -247,6 +253,7 @@ golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/transport/redis_pubsub/subscriber.go b/transport/redis_pubsub/subscriber.go new file mode 100644 index 0000000..41afddc --- /dev/null +++ b/transport/redis_pubsub/subscriber.go @@ -0,0 +1,96 @@ +package redis_pubsub + +import ( + "context" + "sync" + "time" + + "eosio-ship-trace-reader/transport" + + "github.com/go-redis/redis/v8" +) + +type Subscriber struct { + client *redis.Client + sub *redis.PubSub + ctx context.Context + mu sync.RWMutex + timeout time.Duration + channels map[string]chan []byte + ns Namespace +} + +type SubscriberOption func(*Subscriber) + +func WithTimeout(value time.Duration) SubscriberOption { + return func(s *Subscriber) { + s.timeout = value + } +} + +func NewSubscriber(client *redis.Client, ns Namespace, options ...SubscriberOption) *Subscriber { + sub := &Subscriber{ + client: client, + ctx: client.Context(), + sub: client.PSubscribe(client.Context()), + channels: make(map[string]chan []byte), + timeout: time.Millisecond * 200, + ns: ns, + } + + for _, opt := range options { + opt(sub) + } + + go sub.worker() + + return sub +} + +// forward forwards a message to the channel. +// as writes to a unbuffered channel will block until it's read. +// We run select on it and discard the message if no read happends during timeout +func forward(msg redis.Message, ch chan<- []byte, timeout time.Duration) { + select { + case <-time.After(timeout): + case ch <- []byte(msg.Payload): + } +} + +// worker reads messages from redis pubsub and forwards them to +// correct channels. +func (s *Subscriber) worker() { + for msg := range s.sub.Channel() { + // Route message to correct channel. + s.mu.RLock() + if ch, ok := s.channels[msg.Channel]; ok { + go forward(*msg, ch, s.timeout) + } + s.mu.RUnlock() + } +} + +func (s *Subscriber) Read(channel transport.Channel) ([]byte, error) { + var err error + + key := s.ns.NewKey(channel).String() + s.mu.RLock() + ch, ok := s.channels[key] + s.mu.RUnlock() + if !ok { + // Channel does not exist in the map. + // Subscribe and insert it. + err = s.sub.Subscribe(s.ctx, key) + if err != nil { + return nil, err + } + + // Guard race condition to map with mutex. + s.mu.Lock() + ch = make(chan []byte) + s.channels[key] = ch + s.mu.Unlock() + } + + return <-ch, nil +} diff --git a/transport/redis_pubsub/subscriber_test.go b/transport/redis_pubsub/subscriber_test.go new file mode 100644 index 0000000..25473f2 --- /dev/null +++ b/transport/redis_pubsub/subscriber_test.go @@ -0,0 +1,57 @@ +package redis_pubsub + +import ( + "testing" + "time" + + "eosio-ship-trace-reader/transport" + + "github.com/alicebob/miniredis/v2" + "github.com/go-redis/redis/v8" + "github.com/go-redis/redismock/v8" + "github.com/stretchr/testify/assert" +) + +func TestSubscriber_Construct(t *testing.T) { + client, _ := redismock.NewClientMock() + ns := Namespace{Prefix: "prefix", ChainID: "8f2f6ec19400d372c9b3340b1438e9c805cf9e69be962fa81d055bc037ceed8d"} + + s := NewSubscriber(client, ns) + + assert.Equal(t, s.client, client) + assert.Equal(t, s.ctx, client.Context()) + assert.NotNil(t, s.sub) + assert.Equal(t, s.ns, ns) + assert.Equal(t, s.timeout, 200*time.Millisecond) + + s = NewSubscriber(client, ns, WithTimeout(4*time.Second)) + assert.Equal(t, s.timeout, 4*time.Second) +} + +func TestSubscriber_Read(t *testing.T) { + expectedMessages := []string{"payload", "payload2", "payload3"} + + server := miniredis.RunT(t) + + client := redis.NewClient(&redis.Options{ + Addr: server.Addr(), + }) + + s := NewSubscriber(client, Namespace{Prefix: "prefix", ChainID: "d41dbd2921d5a377325661427090c6c508904d60920d6b7ea771c58da5299754"}) + + go func() { + time.Sleep(time.Millisecond * 10) + + for _, msg := range expectedMessages { + server.Publish("prefix::d41dbd2921d5a377325661427090c6c508904d60920d6b7ea771c58da5299754::test", msg) + } + }() + + // Redis pubsub does not guarentee that messages are sent in the correct order. + for range expectedMessages { + msg, err := s.Read(transport.Channel{"test"}) + assert.NoError(t, err) + + assert.Contains(t, expectedMessages, string(msg)) + } +}