1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-16 04:24:56 +02:00
This commit is contained in:
Henrik Hautakoski 2023-03-13 20:56:19 +01:00
parent 531a2d3d17
commit 2f76057f2e
4 changed files with 238 additions and 0 deletions

View file

@ -0,0 +1,83 @@
package redis_stream
import (
"context"
"sync"
"time"
"eosio-ship-trace-reader/transport"
. "eosio-ship-trace-reader/transport/redis_common"
"github.com/go-redis/redis/v8"
)
type Reader struct {
client *redis.Client
ctx context.Context
mu sync.RWMutex
timeout time.Duration
channels map[string]chan []byte
ns Namespace
}
type ReaderOption func(*Reader)
func WithTimeout(value time.Duration) ReaderOption {
return func(s *Reader) {
s.timeout = value
}
}
func NewReader(client *redis.Client, ns Namespace, options ...ReaderOption) *Reader {
sub := &Reader{
client: client,
ctx: client.Context(),
channels: make(map[string]chan []byte),
timeout: time.Millisecond * 200,
ns: ns,
}
for _, opt := range options {
opt(sub)
}
go sub.worker()
return sub
}
// forward forwards a message to the channel.
// as writes to a unbuffered channel will block until it's read.
// We run select on it and discard the message if no read happends during timeout
func forward(msg redis.Message, ch chan<- []byte, timeout time.Duration) {
select {
case <-time.After(timeout):
case ch <- []byte(msg.Payload):
}
}
// worker reads messages from redis pubsub and forwards them to
// correct channels.
func (s *Reader) worker() {
for name, chan := range s.channels {
// Route message to correct channel.
s.mu.RLock()
if ch, ok := s.channels[msg.Channel]; ok {
go forward(*msg, ch, s.timeout)
}
s.mu.RUnlock()
}
}
func (r *Reader) Read(channel transport.Channel) ([]byte, error) {
var err error
r.client.XRange(r.ctx, channel.String(), "~", "~")
key := s.ns.NewKey(channel).String()
s.mu.RLock()
ch, ok := s.channels[key]
s.mu.RUnlock()
return <-ch, nil
}

View file

@ -0,0 +1,35 @@
package redis_stream
import (
"context"
"github.com/go-redis/redis/v8"
)
type Stream struct {
Key string
id string
}
func (s Stream) Read(client *redis.Client, ctx context.Context) error {
args := &redis.XReadArgs{
Streams: []string{s.Key, s.id},
}
streams, err := client.XRead(ctx, args).Result()
if err != nil {
return err
}
for _, strm := range streams {
l := len(strm.Messages)
if l > 0 {
id = strm.Messages[l-1].ID
// Write id to redis
if err := rs.client.Set(rs.ctx, strm.Stream+":id", rs.id, 0).Err(); err != nil {
return err
}
}
}
}

View file

@ -0,0 +1,61 @@
package redis_stream
import (
"context"
"eosio-ship-trace-reader/transport"
. "eosio-ship-trace-reader/transport/redis_common"
_redis "github.com/go-redis/redis/v8"
)
type Writer struct {
pipeline _redis.Pipeliner
ctx context.Context
ns Namespace
max_len int64
}
type Option func(*Writer)
func WithMaxLen(value int64) Option {
return func(p *Writer) {
p.max_len = value
}
}
func WithNamespace(value Namespace) Option {
return func(p *Writer) {
p.ns = value
}
}
func NewWriter(client *_redis.Client, options ...Option) *Writer {
pub := &Writer{
pipeline: client.Pipeline(),
ctx: client.Context(),
max_len: 2000,
}
for _, opt := range options {
opt(pub)
}
return pub
}
func (r *Writer) Write(channel transport.Channel, payload []byte) error {
args := &_redis.XAddArgs{
Stream: r.ns.NewKey(channel).String(),
ID: "*",
MaxLen: r.max_len,
Values: payload,
}
return r.pipeline.XAdd(r.ctx, args).Err()
}
func (r *Writer) Flush() error {
_, err := r.pipeline.Exec(r.ctx)
return err
}

View file

@ -0,0 +1,59 @@
package redis_stream
import (
"errors"
"testing"
"eosio-ship-trace-reader/transport"
. "eosio-ship-trace-reader/transport/redis_common"
_redis "github.com/go-redis/redis/v8"
"github.com/go-redis/redismock/v8"
"github.com/stretchr/testify/assert"
)
func TestWriter_Construct(t *testing.T) {
client, _ := redismock.NewClientMock()
w := NewWriter(client)
assert.Equal(t, w.max_len, int64(2000))
assert.Equal(t, w.ns, Namespace{})
w = NewWriter(client, WithNamespace(Namespace{ChainID: "4422"}))
assert.Equal(t, w.max_len, int64(2000))
assert.Equal(t, w.ns.ChainID, "4422")
w = NewWriter(client, WithNamespace(Namespace{ChainID: "id"}), WithMaxLen(4000))
assert.Equal(t, w.max_len, int64(4000))
assert.Equal(t, w.ns.ChainID, "id")
}
func TestWriter_Write(t *testing.T) {
client, mock := redismock.NewClientMock()
w := NewWriter(client, WithNamespace(Namespace{ChainID: "id"}))
mock.MatchExpectationsInOrder(true)
mock.ExpectXAdd(&_redis.XAddArgs{Stream: "ship::id::test", MaxLen: 2000, Values: []byte("some string")}).SetVal("OK")
mock.ExpectXAdd(&_redis.XAddArgs{Stream: "ship::id::test2", MaxLen: 2000, Values: []byte("some other string")}).SetVal("OK")
assert.NoError(t, w.Write(transport.Channel{"test"}, []byte("some string")))
assert.NoError(t, w.Write(transport.Channel{"test2"}, []byte("some other string")))
assert.NoError(t, w.Flush())
assert.NoError(t, mock.ExpectationsWereMet())
}
func TestWriter_Write_Error(t *testing.T) {
client, mock := redismock.NewClientMock()
w := NewWriter(client, WithNamespace(Namespace{ChainID: "1234"}))
mock.MatchExpectationsInOrder(true)
mock.ExpectXAdd(&_redis.XAddArgs{Stream: "ship::1234::test", MaxLen: 2000, Values: []byte("message")}).SetErr(errors.New("ErrTestValue"))
assert.NoError(t, w.Write(transport.Channel{"test"}, []byte("message")))
assert.EqualError(t, w.Flush(), "ErrTestValue")
assert.NoError(t, mock.ExpectationsWereMet())
}