diff --git a/app/ship_processor.go b/app/ship_processor.go index 8dbadd6..456505c 100644 --- a/app/ship_processor.go +++ b/app/ship_processor.go @@ -33,10 +33,9 @@ func SpawnProccessor(shClient *shipclient.ShipClient, ns transport.Namespace, pu } func (processor *ShipProcessor) queueMessage(channel transport.ChannelInterface, payload []byte) bool { - key := processor.ns.NewKey(channel) - err := processor.publisher.Publish(key.String(), payload) + err := processor.publisher.Publish(channel, payload) if err != nil { - log.WithError(err).Errorf("Failed to post to channel '%s'", key) + log.WithError(err).Errorf("Failed to post to channel '%s'", channel) return false } return true diff --git a/transport/publisher.go b/transport/publisher.go index 420af8a..0a1359b 100644 --- a/transport/publisher.go +++ b/transport/publisher.go @@ -5,7 +5,7 @@ package transport type Publisher interface { // Publish a message to a channel. // The message may or may not be buffered depending on the implementation. - Publish(channel string, payload []byte) error + Publish(channel ChannelInterface, payload []byte) error // Flush writes any buffered messages to the channel. // If the implementation does not support buffering. this is a noop. diff --git a/transport/redis_pubsub/redis.go b/transport/redis_pubsub/redis.go index 3ff598f..ef587d6 100644 --- a/transport/redis_pubsub/redis.go +++ b/transport/redis_pubsub/redis.go @@ -3,6 +3,8 @@ package redis_pubsub import ( "context" + "eosio-ship-trace-reader/transport" + redis "github.com/go-redis/redis/v8" ) @@ -18,8 +20,8 @@ func New(client *redis.Client) *RedisPubsub { } } -func (r *RedisPubsub) Publish(channel string, payload []byte) error { - return r.pipeline.Publish(r.ctx, channel, payload).Err() +func (r *RedisPubsub) Publish(channel transport.ChannelInterface, payload []byte) error { + return r.pipeline.Publish(r.ctx, channel.String(), payload).Err() } func (r *RedisPubsub) Flush() error { diff --git a/transport/redis_pubsub/redis_test.go b/transport/redis_pubsub/redis_test.go index 6493d78..95bd8c9 100644 --- a/transport/redis_pubsub/redis_test.go +++ b/transport/redis_pubsub/redis_test.go @@ -3,6 +3,8 @@ package redis_pubsub import ( "testing" + "eosio-ship-trace-reader/transport" + "github.com/go-redis/redismock/v8" "github.com/stretchr/testify/assert" ) @@ -16,8 +18,8 @@ func TestRedisPubsub(t *testing.T) { mock.ExpectPublish("test", []byte("some string")).SetVal(0) mock.ExpectPublish("test2", []byte("some other string")).SetVal(0) - assert.NoError(t, pubsub.Publish("test", []byte("some string"))) - assert.NoError(t, pubsub.Publish("test2", []byte("some other string"))) + assert.NoError(t, pubsub.Publish(transport.Channel{"test"}, []byte("some string"))) + assert.NoError(t, pubsub.Publish(transport.Channel{"test2"}, []byte("some other string"))) assert.NoError(t, pubsub.Flush()) assert.NoError(t, mock.ExpectationsWereMet())