mirror of
https://github.com/eosswedenorg/thalos
synced 2026-07-03 11:53:41 +02:00
transport/publisher.go: pass ChannelInterface instead of string to Publish()
This commit is contained in:
parent
5202247200
commit
69a9e9f47d
4 changed files with 11 additions and 8 deletions
|
|
@ -33,10 +33,9 @@ func SpawnProccessor(shClient *shipclient.ShipClient, ns transport.Namespace, pu
|
||||||
}
|
}
|
||||||
|
|
||||||
func (processor *ShipProcessor) queueMessage(channel transport.ChannelInterface, payload []byte) bool {
|
func (processor *ShipProcessor) queueMessage(channel transport.ChannelInterface, payload []byte) bool {
|
||||||
key := processor.ns.NewKey(channel)
|
err := processor.publisher.Publish(channel, payload)
|
||||||
err := processor.publisher.Publish(key.String(), payload)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Errorf("Failed to post to channel '%s'", key)
|
log.WithError(err).Errorf("Failed to post to channel '%s'", channel)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ package transport
|
||||||
type Publisher interface {
|
type Publisher interface {
|
||||||
// Publish a message to a channel.
|
// Publish a message to a channel.
|
||||||
// The message may or may not be buffered depending on the implementation.
|
// The message may or may not be buffered depending on the implementation.
|
||||||
Publish(channel string, payload []byte) error
|
Publish(channel ChannelInterface, payload []byte) error
|
||||||
|
|
||||||
// Flush writes any buffered messages to the channel.
|
// Flush writes any buffered messages to the channel.
|
||||||
// If the implementation does not support buffering. this is a noop.
|
// If the implementation does not support buffering. this is a noop.
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,8 @@ package redis_pubsub
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"eosio-ship-trace-reader/transport"
|
||||||
|
|
||||||
redis "github.com/go-redis/redis/v8"
|
redis "github.com/go-redis/redis/v8"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -18,8 +20,8 @@ func New(client *redis.Client) *RedisPubsub {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RedisPubsub) Publish(channel string, payload []byte) error {
|
func (r *RedisPubsub) Publish(channel transport.ChannelInterface, payload []byte) error {
|
||||||
return r.pipeline.Publish(r.ctx, channel, payload).Err()
|
return r.pipeline.Publish(r.ctx, channel.String(), payload).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RedisPubsub) Flush() error {
|
func (r *RedisPubsub) Flush() error {
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,8 @@ package redis_pubsub
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"eosio-ship-trace-reader/transport"
|
||||||
|
|
||||||
"github.com/go-redis/redismock/v8"
|
"github.com/go-redis/redismock/v8"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
@ -16,8 +18,8 @@ func TestRedisPubsub(t *testing.T) {
|
||||||
mock.ExpectPublish("test", []byte("some string")).SetVal(0)
|
mock.ExpectPublish("test", []byte("some string")).SetVal(0)
|
||||||
mock.ExpectPublish("test2", []byte("some other string")).SetVal(0)
|
mock.ExpectPublish("test2", []byte("some other string")).SetVal(0)
|
||||||
|
|
||||||
assert.NoError(t, pubsub.Publish("test", []byte("some string")))
|
assert.NoError(t, pubsub.Publish(transport.Channel{"test"}, []byte("some string")))
|
||||||
assert.NoError(t, pubsub.Publish("test2", []byte("some other string")))
|
assert.NoError(t, pubsub.Publish(transport.Channel{"test2"}, []byte("some other string")))
|
||||||
assert.NoError(t, pubsub.Flush())
|
assert.NoError(t, pubsub.Flush())
|
||||||
|
|
||||||
assert.NoError(t, mock.ExpectationsWereMet())
|
assert.NoError(t, mock.ExpectationsWereMet())
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue