mirror of
https://github.com/eosswedenorg/thalos
synced 2026-06-16 04:24:56 +02:00
Adding transport/redis_pubsub/subscriber.go
This commit is contained in:
parent
7843dda6d9
commit
bd5037b044
4 changed files with 163 additions and 0 deletions
3
go.mod
3
go.mod
|
|
@ -3,6 +3,7 @@ module eosio-ship-trace-reader
|
|||
go 1.18
|
||||
|
||||
require (
|
||||
github.com/alicebob/miniredis/v2 v2.30.0
|
||||
github.com/eoscanada/eos-go v0.10.3-0.20221117104514-64cafd714c60
|
||||
github.com/eosswedenorg-go/antelope-ship-client v0.2.2
|
||||
github.com/eosswedenorg-go/pid v1.0.1
|
||||
|
|
@ -16,6 +17,7 @@ require (
|
|||
)
|
||||
|
||||
require (
|
||||
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
|
||||
github.com/blendle/zapdriver v1.3.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
|
|
@ -37,6 +39,7 @@ require (
|
|||
github.com/vmihailenco/go-tinylfu v0.2.2 // indirect
|
||||
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
|
||||
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
|
||||
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64 // indirect
|
||||
go.uber.org/atomic v1.10.0 // indirect
|
||||
go.uber.org/goleak v1.2.0 // indirect
|
||||
go.uber.org/multierr v1.8.0 // indirect
|
||||
|
|
|
|||
7
go.sum
7
go.sum
|
|
@ -5,6 +5,10 @@ cloud.google.com/go v0.43.0/go.mod h1:BOSR3VbTLkk6FDC/TcffxP4NF/FFBGA5ku+jvKOP7p
|
|||
contrib.go.opencensus.io/exporter/stackdriver v0.12.6/go.mod h1:8x999/OcIPy5ivx/wDiV7Gx4D+VUPODf0mWRGRc5kSk=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
|
||||
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
|
||||
github.com/alicebob/miniredis/v2 v2.30.0 h1:uA3uhDbCxfO9+DI/DuGeAMr9qI+noVWwGPNTFuKID5M=
|
||||
github.com/alicebob/miniredis/v2 v2.30.0/go.mod h1:84TWKZlxYkfgMucPBf5SOQBYJceZeQRFIaQgNMiCX6Q=
|
||||
github.com/aws/aws-sdk-go v1.22.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
|
||||
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
|
||||
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
|
||||
|
|
@ -171,6 +175,8 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh
|
|||
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
|
||||
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64 h1:5mLPGnFdSsevFRFc9q3yYbBkB6tsm4aCwwQV/j1JQAQ=
|
||||
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
|
||||
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
|
||||
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
|
||||
go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA=
|
||||
|
|
@ -247,6 +253,7 @@ golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
|
|||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
|
|
|||
96
transport/redis_pubsub/subscriber.go
Normal file
96
transport/redis_pubsub/subscriber.go
Normal file
|
|
@ -0,0 +1,96 @@
|
|||
package redis_pubsub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"eosio-ship-trace-reader/transport"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
)
|
||||
|
||||
type Subscriber struct {
|
||||
client *redis.Client
|
||||
sub *redis.PubSub
|
||||
ctx context.Context
|
||||
mu sync.RWMutex
|
||||
timeout time.Duration
|
||||
channels map[string]chan []byte
|
||||
ns Namespace
|
||||
}
|
||||
|
||||
type SubscriberOption func(*Subscriber)
|
||||
|
||||
func WithTimeout(value time.Duration) SubscriberOption {
|
||||
return func(s *Subscriber) {
|
||||
s.timeout = value
|
||||
}
|
||||
}
|
||||
|
||||
func NewSubscriber(client *redis.Client, ns Namespace, options ...SubscriberOption) *Subscriber {
|
||||
sub := &Subscriber{
|
||||
client: client,
|
||||
ctx: client.Context(),
|
||||
sub: client.PSubscribe(client.Context()),
|
||||
channels: make(map[string]chan []byte),
|
||||
timeout: time.Millisecond * 200,
|
||||
ns: ns,
|
||||
}
|
||||
|
||||
for _, opt := range options {
|
||||
opt(sub)
|
||||
}
|
||||
|
||||
go sub.worker()
|
||||
|
||||
return sub
|
||||
}
|
||||
|
||||
// forward forwards a message to the channel.
|
||||
// as writes to a unbuffered channel will block until it's read.
|
||||
// We run select on it and discard the message if no read happends during timeout
|
||||
func forward(msg redis.Message, ch chan<- []byte, timeout time.Duration) {
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
case ch <- []byte(msg.Payload):
|
||||
}
|
||||
}
|
||||
|
||||
// worker reads messages from redis pubsub and forwards them to
|
||||
// correct channels.
|
||||
func (s *Subscriber) worker() {
|
||||
for msg := range s.sub.Channel() {
|
||||
// Route message to correct channel.
|
||||
s.mu.RLock()
|
||||
if ch, ok := s.channels[msg.Channel]; ok {
|
||||
go forward(*msg, ch, s.timeout)
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Subscriber) Read(channel transport.Channel) ([]byte, error) {
|
||||
var err error
|
||||
|
||||
key := s.ns.NewKey(channel).String()
|
||||
s.mu.RLock()
|
||||
ch, ok := s.channels[key]
|
||||
s.mu.RUnlock()
|
||||
if !ok {
|
||||
// Channel does not exist in the map.
|
||||
// Subscribe and insert it.
|
||||
err = s.sub.Subscribe(s.ctx, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Guard race condition to map with mutex.
|
||||
s.mu.Lock()
|
||||
ch = make(chan []byte)
|
||||
s.channels[key] = ch
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
return <-ch, nil
|
||||
}
|
||||
57
transport/redis_pubsub/subscriber_test.go
Normal file
57
transport/redis_pubsub/subscriber_test.go
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
package redis_pubsub
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"eosio-ship-trace-reader/transport"
|
||||
|
||||
"github.com/alicebob/miniredis/v2"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/go-redis/redismock/v8"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestSubscriber_Construct(t *testing.T) {
|
||||
client, _ := redismock.NewClientMock()
|
||||
ns := Namespace{Prefix: "prefix", ChainID: "8f2f6ec19400d372c9b3340b1438e9c805cf9e69be962fa81d055bc037ceed8d"}
|
||||
|
||||
s := NewSubscriber(client, ns)
|
||||
|
||||
assert.Equal(t, s.client, client)
|
||||
assert.Equal(t, s.ctx, client.Context())
|
||||
assert.NotNil(t, s.sub)
|
||||
assert.Equal(t, s.ns, ns)
|
||||
assert.Equal(t, s.timeout, 200*time.Millisecond)
|
||||
|
||||
s = NewSubscriber(client, ns, WithTimeout(4*time.Second))
|
||||
assert.Equal(t, s.timeout, 4*time.Second)
|
||||
}
|
||||
|
||||
func TestSubscriber_Read(t *testing.T) {
|
||||
expectedMessages := []string{"payload", "payload2", "payload3"}
|
||||
|
||||
server := miniredis.RunT(t)
|
||||
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: server.Addr(),
|
||||
})
|
||||
|
||||
s := NewSubscriber(client, Namespace{Prefix: "prefix", ChainID: "d41dbd2921d5a377325661427090c6c508904d60920d6b7ea771c58da5299754"})
|
||||
|
||||
go func() {
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
|
||||
for _, msg := range expectedMessages {
|
||||
server.Publish("prefix::d41dbd2921d5a377325661427090c6c508904d60920d6b7ea771c58da5299754::test", msg)
|
||||
}
|
||||
}()
|
||||
|
||||
// Redis pubsub does not guarentee that messages are sent in the correct order.
|
||||
for range expectedMessages {
|
||||
msg, err := s.Read(transport.Channel{"test"})
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Contains(t, expectedMessages, string(msg))
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue