1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-07-04 12:03:41 +02:00

transport/publisher.go: rename to writer.go

This commit is contained in:
Henrik Hautakoski 2023-03-07 11:41:22 +01:00
parent b3f773655d
commit f9fc88b0fb
4 changed files with 20 additions and 20 deletions

View file

@ -28,15 +28,15 @@ func logDecoratedEncoder(encoder message.Encoder) message.Encoder {
type ShipProcessor struct { type ShipProcessor struct {
abi *abi.AbiManager abi *abi.AbiManager
publisher transport.Publisher writer transport.Writer
shClient *shipclient.Client shClient *shipclient.Client
encode message.Encoder encode message.Encoder
} }
func SpawnProccessor(shClient *shipclient.Client, publisher transport.Publisher, abi *abi.AbiManager) { func SpawnProccessor(shClient *shipclient.Client, writer transport.Writer, abi *abi.AbiManager) {
processor := &ShipProcessor{ processor := &ShipProcessor{
abi: abi, abi: abi,
publisher: publisher, writer: writer,
shClient: shClient, shClient: shClient,
encode: logDecoratedEncoder(json.Marshal), encode: logDecoratedEncoder(json.Marshal),
} }
@ -47,7 +47,7 @@ func SpawnProccessor(shClient *shipclient.Client, publisher transport.Publisher,
} }
func (processor *ShipProcessor) queueMessage(channel transport.ChannelInterface, payload []byte) bool { func (processor *ShipProcessor) queueMessage(channel transport.ChannelInterface, payload []byte) bool {
err := processor.publisher.Publish(channel, payload) err := processor.writer.Write(channel, payload)
if err != nil { if err != nil {
log.WithError(err).Errorf("Failed to post to channel '%s'", channel) log.WithError(err).Errorf("Failed to post to channel '%s'", channel)
return false return false
@ -76,7 +76,7 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) {
processor.encodeQueue(transport.HeartbeatChannel, hb) processor.encodeQueue(transport.HeartbeatChannel, hb)
err := processor.publisher.Flush() err := processor.writer.Flush()
if err != nil { if err != nil {
log.WithError(err).Error("Failed to send messages") log.WithError(err).Error("Failed to send messages")
} }
@ -129,7 +129,7 @@ func (processor *ShipProcessor) processTraces(traces []*ship.TransactionTraceV0)
} }
} }
err := processor.publisher.Flush() err := processor.writer.Flush()
if err != nil { if err != nil {
log.WithError(err).Error("Failed to send messages") log.WithError(err).Error("Failed to send messages")
} }

View file

@ -22,7 +22,7 @@ func NewPublisher(client *redis.Client, ns Namespace) *Publisher {
} }
} }
func (r *Publisher) Publish(channel transport.ChannelInterface, payload []byte) error { func (r *Publisher) Write(channel transport.ChannelInterface, payload []byte) error {
return r.pipeline.Publish(r.ctx, r.ns.NewKey(channel).String(), payload).Err() return r.pipeline.Publish(r.ctx, r.ns.NewKey(channel).String(), payload).Err()
} }

View file

@ -18,8 +18,8 @@ func TestPublisher_Publish(t *testing.T) {
mock.ExpectPublish("ship::id::test", []byte("some string")).SetVal(0) mock.ExpectPublish("ship::id::test", []byte("some string")).SetVal(0)
mock.ExpectPublish("ship::id::test2", []byte("some other string")).SetVal(0) mock.ExpectPublish("ship::id::test2", []byte("some other string")).SetVal(0)
assert.NoError(t, pub.Publish(transport.Channel{"test"}, []byte("some string"))) assert.NoError(t, pub.Write(transport.Channel{"test"}, []byte("some string")))
assert.NoError(t, pub.Publish(transport.Channel{"test2"}, []byte("some other string"))) assert.NoError(t, pub.Write(transport.Channel{"test2"}, []byte("some other string")))
assert.NoError(t, pub.Flush()) assert.NoError(t, pub.Flush())
assert.NoError(t, mock.ExpectationsWereMet()) assert.NoError(t, mock.ExpectationsWereMet())

View file

@ -1,11 +1,11 @@
package transport package transport
// Publisher interface defines the required methods // Writer interface defines the required methods
// to send messages over channel. // to send messages over an channel.
type Publisher interface { type Writer interface {
// Publish a message to a channel. // Write writes a message over a channel.
// The message may or may not be buffered depending on the implementation. // The message may or may not be buffered depending on the implementation.
Publish(channel ChannelInterface, payload []byte) error Write(channel ChannelInterface, payload []byte) error
// Flush writes any buffered messages to the channel. // Flush writes any buffered messages to the channel.
// If the implementation does not support buffering. this is a noop. // If the implementation does not support buffering. this is a noop.