diff --git a/transport/redis_stream/reader.go b/transport/redis_stream/reader.go new file mode 100644 index 0000000..b1b87a1 --- /dev/null +++ b/transport/redis_stream/reader.go @@ -0,0 +1,83 @@ +package redis_stream + +import ( + "context" + "sync" + "time" + + "eosio-ship-trace-reader/transport" + . "eosio-ship-trace-reader/transport/redis_common" + + "github.com/go-redis/redis/v8" +) + +type Reader struct { + client *redis.Client + ctx context.Context + mu sync.RWMutex + timeout time.Duration + channels map[string]chan []byte + ns Namespace +} + +type ReaderOption func(*Reader) + +func WithTimeout(value time.Duration) ReaderOption { + return func(s *Reader) { + s.timeout = value + } +} + +func NewReader(client *redis.Client, ns Namespace, options ...ReaderOption) *Reader { + sub := &Reader{ + client: client, + ctx: client.Context(), + channels: make(map[string]chan []byte), + timeout: time.Millisecond * 200, + ns: ns, + } + + for _, opt := range options { + opt(sub) + } + + go sub.worker() + + return sub +} + +// forward forwards a message to the channel. +// as writes to a unbuffered channel will block until it's read. +// We run select on it and discard the message if no read happends during timeout +func forward(msg redis.Message, ch chan<- []byte, timeout time.Duration) { + select { + case <-time.After(timeout): + case ch <- []byte(msg.Payload): + } +} + +// worker reads messages from redis pubsub and forwards them to +// correct channels. +func (s *Reader) worker() { + for name, chan := range s.channels { + // Route message to correct channel. + s.mu.RLock() + if ch, ok := s.channels[msg.Channel]; ok { + go forward(*msg, ch, s.timeout) + } + s.mu.RUnlock() + } +} + +func (r *Reader) Read(channel transport.Channel) ([]byte, error) { + var err error + + r.client.XRange(r.ctx, channel.String(), "~", "~") + + key := s.ns.NewKey(channel).String() + s.mu.RLock() + ch, ok := s.channels[key] + s.mu.RUnlock() + + return <-ch, nil +} diff --git a/transport/redis_stream/stream.go b/transport/redis_stream/stream.go new file mode 100644 index 0000000..526876e --- /dev/null +++ b/transport/redis_stream/stream.go @@ -0,0 +1,35 @@ +package redis_stream + +import ( + "context" + + "github.com/go-redis/redis/v8" +) + +type Stream struct { + Key string + id string +} + +func (s Stream) Read(client *redis.Client, ctx context.Context) error { + args := &redis.XReadArgs{ + Streams: []string{s.Key, s.id}, + } + + streams, err := client.XRead(ctx, args).Result() + if err != nil { + return err + } + + for _, strm := range streams { + l := len(strm.Messages) + if l > 0 { + id = strm.Messages[l-1].ID + + // Write id to redis + if err := rs.client.Set(rs.ctx, strm.Stream+":id", rs.id, 0).Err(); err != nil { + return err + } + } + } +} diff --git a/transport/redis_stream/writer.go b/transport/redis_stream/writer.go new file mode 100644 index 0000000..71ce51e --- /dev/null +++ b/transport/redis_stream/writer.go @@ -0,0 +1,61 @@ +package redis_stream + +import ( + "context" + + "eosio-ship-trace-reader/transport" + . "eosio-ship-trace-reader/transport/redis_common" + + _redis "github.com/go-redis/redis/v8" +) + +type Writer struct { + pipeline _redis.Pipeliner + ctx context.Context + ns Namespace + max_len int64 +} + +type Option func(*Writer) + +func WithMaxLen(value int64) Option { + return func(p *Writer) { + p.max_len = value + } +} + +func WithNamespace(value Namespace) Option { + return func(p *Writer) { + p.ns = value + } +} + +func NewWriter(client *_redis.Client, options ...Option) *Writer { + pub := &Writer{ + pipeline: client.Pipeline(), + ctx: client.Context(), + max_len: 2000, + } + + for _, opt := range options { + opt(pub) + } + + return pub +} + +func (r *Writer) Write(channel transport.Channel, payload []byte) error { + args := &_redis.XAddArgs{ + Stream: r.ns.NewKey(channel).String(), + ID: "*", + MaxLen: r.max_len, + Values: payload, + } + + return r.pipeline.XAdd(r.ctx, args).Err() +} + +func (r *Writer) Flush() error { + _, err := r.pipeline.Exec(r.ctx) + return err +} diff --git a/transport/redis_stream/writer_test.go b/transport/redis_stream/writer_test.go new file mode 100644 index 0000000..eb57a29 --- /dev/null +++ b/transport/redis_stream/writer_test.go @@ -0,0 +1,59 @@ +package redis_stream + +import ( + "errors" + "testing" + + "eosio-ship-trace-reader/transport" + . "eosio-ship-trace-reader/transport/redis_common" + + _redis "github.com/go-redis/redis/v8" + "github.com/go-redis/redismock/v8" + "github.com/stretchr/testify/assert" +) + +func TestWriter_Construct(t *testing.T) { + client, _ := redismock.NewClientMock() + + w := NewWriter(client) + assert.Equal(t, w.max_len, int64(2000)) + assert.Equal(t, w.ns, Namespace{}) + + w = NewWriter(client, WithNamespace(Namespace{ChainID: "4422"})) + assert.Equal(t, w.max_len, int64(2000)) + assert.Equal(t, w.ns.ChainID, "4422") + + w = NewWriter(client, WithNamespace(Namespace{ChainID: "id"}), WithMaxLen(4000)) + assert.Equal(t, w.max_len, int64(4000)) + assert.Equal(t, w.ns.ChainID, "id") +} + +func TestWriter_Write(t *testing.T) { + client, mock := redismock.NewClientMock() + + w := NewWriter(client, WithNamespace(Namespace{ChainID: "id"})) + + mock.MatchExpectationsInOrder(true) + mock.ExpectXAdd(&_redis.XAddArgs{Stream: "ship::id::test", MaxLen: 2000, Values: []byte("some string")}).SetVal("OK") + mock.ExpectXAdd(&_redis.XAddArgs{Stream: "ship::id::test2", MaxLen: 2000, Values: []byte("some other string")}).SetVal("OK") + + assert.NoError(t, w.Write(transport.Channel{"test"}, []byte("some string"))) + assert.NoError(t, w.Write(transport.Channel{"test2"}, []byte("some other string"))) + assert.NoError(t, w.Flush()) + + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestWriter_Write_Error(t *testing.T) { + client, mock := redismock.NewClientMock() + + w := NewWriter(client, WithNamespace(Namespace{ChainID: "1234"})) + + mock.MatchExpectationsInOrder(true) + mock.ExpectXAdd(&_redis.XAddArgs{Stream: "ship::1234::test", MaxLen: 2000, Values: []byte("message")}).SetErr(errors.New("ErrTestValue")) + + assert.NoError(t, w.Write(transport.Channel{"test"}, []byte("message"))) + assert.EqualError(t, w.Flush(), "ErrTestValue") + + assert.NoError(t, mock.ExpectationsWereMet()) +}