1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-18 04:40:03 +02:00

rename transport to api

This commit is contained in:
Henrik Hautakoski 2023-04-19 09:52:29 +02:00
parent 044ed4e891
commit 102045e47e
15 changed files with 40 additions and 40 deletions

View file

@ -0,0 +1,37 @@
package redis_pubsub
import (
"context"
"thalos/api"
. "thalos/api/redis_common"
"github.com/go-redis/redis/v8"
)
type Publisher struct {
pipeline redis.Pipeliner
ctx context.Context
ns Namespace
}
func NewPublisher(client *redis.Client, ns Namespace) *Publisher {
return &Publisher{
pipeline: client.Pipeline(),
ctx: client.Context(),
ns: ns,
}
}
func (r *Publisher) Write(channel api.Channel, payload []byte) error {
return r.pipeline.Publish(r.ctx, r.ns.NewKey(channel).String(), payload).Err()
}
func (r *Publisher) Flush() error {
_, err := r.pipeline.Exec(r.ctx)
return err
}
func (r *Publisher) Close() error {
return r.pipeline.Close()
}

View file

@ -0,0 +1,27 @@
package redis_pubsub
import (
"testing"
"thalos/api"
. "thalos/api/redis_common"
"github.com/go-redis/redismock/v8"
"github.com/stretchr/testify/assert"
)
func TestPublisher_Write(t *testing.T) {
client, mock := redismock.NewClientMock()
pub := NewPublisher(client, Namespace{ChainID: "id"})
mock.MatchExpectationsInOrder(true)
mock.ExpectPublish("ship::id::test", []byte("some string")).SetVal(0)
mock.ExpectPublish("ship::id::test2", []byte("some other string")).SetVal(0)
assert.NoError(t, pub.Write(api.Channel{"test"}, []byte("some string")))
assert.NoError(t, pub.Write(api.Channel{"test2"}, []byte("some other string")))
assert.NoError(t, pub.Flush())
assert.NoError(t, mock.ExpectationsWereMet())
}

View file

@ -0,0 +1,110 @@
package redis_pubsub
import (
"context"
"sync"
"time"
"thalos/api"
. "thalos/api/redis_common"
"github.com/go-redis/redis/v8"
)
type Subscriber struct {
client *redis.Client
sub *redis.PubSub
ctx context.Context
mu sync.RWMutex
timeout time.Duration
channels map[string]chan []byte
ns Namespace
}
type SubscriberOption func(*Subscriber)
func WithTimeout(value time.Duration) SubscriberOption {
return func(s *Subscriber) {
s.timeout = value
}
}
func NewSubscriber(client *redis.Client, ns Namespace, options ...SubscriberOption) *Subscriber {
sub := &Subscriber{
client: client,
ctx: client.Context(),
sub: client.PSubscribe(client.Context()),
channels: make(map[string]chan []byte),
timeout: time.Millisecond * 200,
ns: ns,
}
for _, opt := range options {
opt(sub)
}
go sub.worker()
return sub
}
// forward forwards a message to the channel.
// as writes to a unbuffered channel will block until it's read.
// We run select on it and discard the message if no read happends during timeout
func forward(msg redis.Message, ch chan<- []byte, timeout time.Duration) {
select {
case <-time.After(timeout):
case ch <- []byte(msg.Payload):
}
}
// worker reads messages from redis pubsub and forwards them to
// correct channels.
func (s *Subscriber) worker() {
for msg := range s.sub.Channel() {
// Route message to correct channel.
s.mu.RLock()
if ch, ok := s.channels[msg.Channel]; ok {
go forward(*msg, ch, s.timeout)
}
s.mu.RUnlock()
}
}
func (s *Subscriber) Read(channel api.Channel) ([]byte, error) {
var err error
key := s.ns.NewKey(channel).String()
s.mu.RLock()
ch, ok := s.channels[key]
s.mu.RUnlock()
if !ok {
// Channel does not exist in the map.
// Subscribe and insert it.
err = s.sub.Subscribe(s.ctx, key)
if err != nil {
return nil, err
}
// Guard race condition to map with mutex.
s.mu.Lock()
ch = make(chan []byte)
s.channels[key] = ch
s.mu.Unlock()
}
return <-ch, nil
}
func (s *Subscriber) Close() error {
err := s.sub.Close()
for _, ch := range s.channels {
close(ch)
}
s.mu.Lock()
s.channels = make(map[string]chan []byte)
s.mu.Unlock()
return err
}

View file

@ -0,0 +1,58 @@
package redis_pubsub
import (
"testing"
"time"
"thalos/api"
. "thalos/api/redis_common"
"github.com/alicebob/miniredis/v2"
"github.com/go-redis/redis/v8"
"github.com/go-redis/redismock/v8"
"github.com/stretchr/testify/assert"
)
func TestSubscriber_Construct(t *testing.T) {
client, _ := redismock.NewClientMock()
ns := Namespace{Prefix: "prefix", ChainID: "8f2f6ec19400d372c9b3340b1438e9c805cf9e69be962fa81d055bc037ceed8d"}
s := NewSubscriber(client, ns)
assert.Equal(t, s.client, client)
assert.Equal(t, s.ctx, client.Context())
assert.NotNil(t, s.sub)
assert.Equal(t, s.ns, ns)
assert.Equal(t, s.timeout, 200*time.Millisecond)
s = NewSubscriber(client, ns, WithTimeout(4*time.Second))
assert.Equal(t, s.timeout, 4*time.Second)
}
func TestSubscriber_Read(t *testing.T) {
expectedMessages := []string{"payload", "payload2", "payload3"}
server := miniredis.RunT(t)
client := redis.NewClient(&redis.Options{
Addr: server.Addr(),
})
s := NewSubscriber(client, Namespace{Prefix: "prefix", ChainID: "d41dbd2921d5a377325661427090c6c508904d60920d6b7ea771c58da5299754"})
go func() {
time.Sleep(time.Millisecond * 10)
for _, msg := range expectedMessages {
server.Publish("prefix::d41dbd2921d5a377325661427090c6c508904d60920d6b7ea771c58da5299754::test", msg)
}
}()
// Redis pubsub does not guarentee that messages are sent in the correct order.
for range expectedMessages {
msg, err := s.Read(api.Channel{"test"})
assert.NoError(t, err)
assert.Contains(t, expectedMessages, string(msg))
}
}