diff --git a/app/ship_processor.go b/app/ship_processor.go index e45470d..4c6b8cc 100644 --- a/app/ship_processor.go +++ b/app/ship_processor.go @@ -46,7 +46,7 @@ func SpawnProccessor(shClient *shipclient.Client, writer transport.Writer, abi * shClient.TraceHandler = processor.processTraces } -func (processor *ShipProcessor) queueMessage(channel transport.ChannelInterface, payload []byte) bool { +func (processor *ShipProcessor) queueMessage(channel transport.Channel, payload []byte) bool { err := processor.writer.Write(channel, payload) if err != nil { log.WithError(err).Errorf("Failed to post to channel '%s'", channel) @@ -55,7 +55,7 @@ func (processor *ShipProcessor) queueMessage(channel transport.ChannelInterface, return true } -func (processor *ShipProcessor) encodeQueue(channel transport.ChannelInterface, v interface{}) bool { +func (processor *ShipProcessor) encodeQueue(channel transport.Channel, v interface{}) bool { if payload, err := processor.encode(v); err == nil { return processor.queueMessage(channel, payload) } @@ -116,11 +116,11 @@ func (processor *ShipProcessor) processTraces(traces []*ship.TransactionTraceV0) continue } - channels := []transport.ChannelInterface{ - transport.ActionChannel{}, - transport.ActionChannel{Action: string(act.Action)}, - transport.ActionChannel{Contract: string(act.Contract)}, - transport.ActionChannel{Action: string(act.Action), Contract: string(act.Contract)}, + channels := []transport.Channel{ + transport.Action{}.Channel(), + transport.Action{Action: act.Action}.Channel(), + transport.Action{Contract: act.Contract}.Channel(), + transport.Action{Action: act.Action, Contract: act.Contract}.Channel(), } for _, channel := range channels { diff --git a/transport/channel.go b/transport/channel.go index 2a2a939..ca21e74 100644 --- a/transport/channel.go +++ b/transport/channel.go @@ -4,12 +4,7 @@ import ( "strings" ) -// Generic interface for all channel types. -type ChannelInterface interface { - String() string -} - -// Standard channel. Just a wrapper around string slice +// Channel is just a wrapper around string slice type Channel []string func (c *Channel) Append(name ...string) { @@ -41,23 +36,22 @@ var ( HeartbeatChannel = Channel{"heartbeat"} ) -// Action channel. - -type ActionChannel struct { +// Action Channel +type Action struct { Contract string Action string } -func (ac ActionChannel) String() string { +func (a Action) Channel() Channel { ch := Channel{"actions"} - if len(ac.Contract) > 0 { - ch.Append("contract", ac.Contract) + if len(a.Contract) > 0 { + ch.Append("contract", a.Contract) } - if len(ac.Action) > 0 { - ch.Append("action", ac.Action) + if len(a.Action) > 0 { + ch.Append("action", a.Action) } - return ch.String() + return ch } diff --git a/transport/channel_test.go b/transport/channel_test.go index 60cf609..676c3a6 100644 --- a/transport/channel_test.go +++ b/transport/channel_test.go @@ -44,27 +44,19 @@ func TestChannel_String(t *testing.T) { } func TestActionChannel_String(t *testing.T) { - type fields struct { - Contract string - Action string - } tests := []struct { - name string - fields fields - want string + name string + ch Channel + want string }{ - {"Empty", fields{}, "actions"}, - {"Contract", fields{Contract: "mycontract"}, "actions/contract/mycontract"}, - {"Action", fields{Action: "myaction"}, "actions/action/myaction"}, - {"ContractAction", fields{Contract: "mycontract", Action: "myaction"}, "actions/contract/mycontract/action/myaction"}, + {"Empty", Action{}.Channel(), "actions"}, + {"Contract", Action{Contract: "mycontract"}.Channel(), "actions/contract/mycontract"}, + {"Action", Action{Action: "myaction"}.Channel(), "actions/action/myaction"}, + {"ContractAndAction", Action{Contract: "mycontract", Action: "myaction"}.Channel(), "actions/contract/mycontract/action/myaction"}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ac := ActionChannel{ - Contract: tt.fields.Contract, - Action: tt.fields.Action, - } - if got := ac.String(); got != tt.want { + if got := tt.ch.String(); got != tt.want { t.Errorf("ActionChannel.String() = %v, want %v", got, tt.want) } }) diff --git a/transport/reader.go b/transport/reader.go index 70ffeb4..3b3bd56 100644 --- a/transport/reader.go +++ b/transport/reader.go @@ -7,5 +7,5 @@ package transport type Reader interface { // Read a message from a channel. // Read may block until a message is ready or an error occured. - Read(channel ChannelInterface) ([]byte, error) + Read(channel Channel) ([]byte, error) } diff --git a/transport/redis_pubsub/key.go b/transport/redis_pubsub/key.go index bd01cd4..36d940f 100644 --- a/transport/redis_pubsub/key.go +++ b/transport/redis_pubsub/key.go @@ -11,7 +11,7 @@ import ( type Key struct { NS Namespace - Channel transport.ChannelInterface + Channel transport.Channel } func (k Key) String() string { diff --git a/transport/redis_pubsub/key_test.go b/transport/redis_pubsub/key_test.go index 81b7001..757bcf5 100644 --- a/transport/redis_pubsub/key_test.go +++ b/transport/redis_pubsub/key_test.go @@ -9,7 +9,7 @@ import ( func TestKey_String(t *testing.T) { type fields struct { NS Namespace - Channel transport.ChannelInterface + Channel transport.Channel } tests := []struct { name string @@ -19,7 +19,7 @@ func TestKey_String(t *testing.T) { {"Empty", fields{NS: Namespace{}, Channel: transport.Channel{}}, "ship::0000000000000000000000000000000000000000000000000000000000000000::"}, {"Transactions", fields{NS: Namespace{ChainID: "id"}, Channel: transport.Channel{"transactions"}}, "ship::id::transactions"}, {"Nested", fields{NS: Namespace{ChainID: "id"}, Channel: transport.Channel{"one.two"}}, "ship::id::one.two"}, - {"Action", fields{NS: Namespace{ChainID: "id"}, Channel: transport.ActionChannel{Contract: "mycontract"}}, "ship::id::actions/contract/mycontract"}, + {"Action", fields{NS: Namespace{ChainID: "id"}, Channel: transport.Action{Contract: "mycontract"}.Channel()}, "ship::id::actions/contract/mycontract"}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/transport/redis_pubsub/namespace.go b/transport/redis_pubsub/namespace.go index 505e577..aee84a5 100644 --- a/transport/redis_pubsub/namespace.go +++ b/transport/redis_pubsub/namespace.go @@ -27,7 +27,7 @@ type Namespace struct { } // Create a new key with this namespace. -func (ns Namespace) NewKey(ch transport.ChannelInterface) Key { +func (ns Namespace) NewKey(ch transport.Channel) Key { return Key{NS: ns, Channel: ch} } diff --git a/transport/redis_pubsub/publisher.go b/transport/redis_pubsub/publisher.go index 82302ba..ce22f34 100644 --- a/transport/redis_pubsub/publisher.go +++ b/transport/redis_pubsub/publisher.go @@ -22,7 +22,7 @@ func NewPublisher(client *redis.Client, ns Namespace) *Publisher { } } -func (r *Publisher) Write(channel transport.ChannelInterface, payload []byte) error { +func (r *Publisher) Write(channel transport.Channel, payload []byte) error { return r.pipeline.Publish(r.ctx, r.ns.NewKey(channel).String(), payload).Err() } diff --git a/transport/writer.go b/transport/writer.go index 4295070..8b2e7b4 100644 --- a/transport/writer.go +++ b/transport/writer.go @@ -5,7 +5,7 @@ package transport type Writer interface { // Write writes a message over a channel. // The message may or may not be buffered depending on the implementation. - Write(channel ChannelInterface, payload []byte) error + Write(channel Channel, payload []byte) error // Flush writes any buffered messages to the channel. // If the implementation does not support buffering. this is a noop.