diff --git a/app/ship_processor.go b/app/ship_processor.go index e9e38b0..e45470d 100644 --- a/app/ship_processor.go +++ b/app/ship_processor.go @@ -27,18 +27,18 @@ func logDecoratedEncoder(encoder message.Encoder) message.Encoder { } type ShipProcessor struct { - abi *abi.AbiManager - publisher transport.Publisher - shClient *shipclient.Client - encode message.Encoder + abi *abi.AbiManager + writer transport.Writer + shClient *shipclient.Client + encode message.Encoder } -func SpawnProccessor(shClient *shipclient.Client, publisher transport.Publisher, abi *abi.AbiManager) { +func SpawnProccessor(shClient *shipclient.Client, writer transport.Writer, abi *abi.AbiManager) { processor := &ShipProcessor{ - abi: abi, - publisher: publisher, - shClient: shClient, - encode: logDecoratedEncoder(json.Marshal), + abi: abi, + writer: writer, + shClient: shClient, + encode: logDecoratedEncoder(json.Marshal), } // Attach handlers @@ -47,7 +47,7 @@ func SpawnProccessor(shClient *shipclient.Client, publisher transport.Publisher, } func (processor *ShipProcessor) queueMessage(channel transport.ChannelInterface, payload []byte) bool { - err := processor.publisher.Publish(channel, payload) + err := processor.writer.Write(channel, payload) if err != nil { log.WithError(err).Errorf("Failed to post to channel '%s'", channel) return false @@ -76,7 +76,7 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { processor.encodeQueue(transport.HeartbeatChannel, hb) - err := processor.publisher.Flush() + err := processor.writer.Flush() if err != nil { log.WithError(err).Error("Failed to send messages") } @@ -129,7 +129,7 @@ func (processor *ShipProcessor) processTraces(traces []*ship.TransactionTraceV0) } } - err := processor.publisher.Flush() + err := processor.writer.Flush() if err != nil { log.WithError(err).Error("Failed to send messages") } diff --git a/transport/redis_pubsub/publisher.go b/transport/redis_pubsub/publisher.go index a651dd1..82302ba 100644 --- a/transport/redis_pubsub/publisher.go +++ b/transport/redis_pubsub/publisher.go @@ -22,7 +22,7 @@ func NewPublisher(client *redis.Client, ns Namespace) *Publisher { } } -func (r *Publisher) Publish(channel transport.ChannelInterface, payload []byte) error { +func (r *Publisher) Write(channel transport.ChannelInterface, payload []byte) error { return r.pipeline.Publish(r.ctx, r.ns.NewKey(channel).String(), payload).Err() } diff --git a/transport/redis_pubsub/publisher_test.go b/transport/redis_pubsub/publisher_test.go index 685f81e..ff9c3ef 100644 --- a/transport/redis_pubsub/publisher_test.go +++ b/transport/redis_pubsub/publisher_test.go @@ -18,8 +18,8 @@ func TestPublisher_Publish(t *testing.T) { mock.ExpectPublish("ship::id::test", []byte("some string")).SetVal(0) mock.ExpectPublish("ship::id::test2", []byte("some other string")).SetVal(0) - assert.NoError(t, pub.Publish(transport.Channel{"test"}, []byte("some string"))) - assert.NoError(t, pub.Publish(transport.Channel{"test2"}, []byte("some other string"))) + assert.NoError(t, pub.Write(transport.Channel{"test"}, []byte("some string"))) + assert.NoError(t, pub.Write(transport.Channel{"test2"}, []byte("some other string"))) assert.NoError(t, pub.Flush()) assert.NoError(t, mock.ExpectationsWereMet()) diff --git a/transport/publisher.go b/transport/writer.go similarity index 53% rename from transport/publisher.go rename to transport/writer.go index 0a1359b..4295070 100644 --- a/transport/publisher.go +++ b/transport/writer.go @@ -1,11 +1,11 @@ package transport -// Publisher interface defines the required methods -// to send messages over channel. -type Publisher interface { - // Publish a message to a channel. +// Writer interface defines the required methods +// to send messages over an channel. +type Writer interface { + // Write writes a message over a channel. // The message may or may not be buffered depending on the implementation. - Publish(channel ChannelInterface, payload []byte) error + Write(channel ChannelInterface, payload []byte) error // Flush writes any buffered messages to the channel. // If the implementation does not support buffering. this is a noop.