mirror of
https://github.com/eosswedenorg/thalos
synced 2026-06-16 04:24:56 +02:00
transport/channel.go: Rewrite to not use a interface. Action struct has a Channel() method that returns a channel instead.
This commit is contained in:
parent
8e99146cc2
commit
68c21c8ed8
9 changed files with 31 additions and 45 deletions
|
|
@ -46,7 +46,7 @@ func SpawnProccessor(shClient *shipclient.Client, writer transport.Writer, abi *
|
|||
shClient.TraceHandler = processor.processTraces
|
||||
}
|
||||
|
||||
func (processor *ShipProcessor) queueMessage(channel transport.ChannelInterface, payload []byte) bool {
|
||||
func (processor *ShipProcessor) queueMessage(channel transport.Channel, payload []byte) bool {
|
||||
err := processor.writer.Write(channel, payload)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("Failed to post to channel '%s'", channel)
|
||||
|
|
@ -55,7 +55,7 @@ func (processor *ShipProcessor) queueMessage(channel transport.ChannelInterface,
|
|||
return true
|
||||
}
|
||||
|
||||
func (processor *ShipProcessor) encodeQueue(channel transport.ChannelInterface, v interface{}) bool {
|
||||
func (processor *ShipProcessor) encodeQueue(channel transport.Channel, v interface{}) bool {
|
||||
if payload, err := processor.encode(v); err == nil {
|
||||
return processor.queueMessage(channel, payload)
|
||||
}
|
||||
|
|
@ -116,11 +116,11 @@ func (processor *ShipProcessor) processTraces(traces []*ship.TransactionTraceV0)
|
|||
continue
|
||||
}
|
||||
|
||||
channels := []transport.ChannelInterface{
|
||||
transport.ActionChannel{},
|
||||
transport.ActionChannel{Action: string(act.Action)},
|
||||
transport.ActionChannel{Contract: string(act.Contract)},
|
||||
transport.ActionChannel{Action: string(act.Action), Contract: string(act.Contract)},
|
||||
channels := []transport.Channel{
|
||||
transport.Action{}.Channel(),
|
||||
transport.Action{Action: act.Action}.Channel(),
|
||||
transport.Action{Contract: act.Contract}.Channel(),
|
||||
transport.Action{Action: act.Action, Contract: act.Contract}.Channel(),
|
||||
}
|
||||
|
||||
for _, channel := range channels {
|
||||
|
|
|
|||
|
|
@ -4,12 +4,7 @@ import (
|
|||
"strings"
|
||||
)
|
||||
|
||||
// Generic interface for all channel types.
|
||||
type ChannelInterface interface {
|
||||
String() string
|
||||
}
|
||||
|
||||
// Standard channel. Just a wrapper around string slice
|
||||
// Channel is just a wrapper around string slice
|
||||
type Channel []string
|
||||
|
||||
func (c *Channel) Append(name ...string) {
|
||||
|
|
@ -41,23 +36,22 @@ var (
|
|||
HeartbeatChannel = Channel{"heartbeat"}
|
||||
)
|
||||
|
||||
// Action channel.
|
||||
|
||||
type ActionChannel struct {
|
||||
// Action Channel
|
||||
type Action struct {
|
||||
Contract string
|
||||
Action string
|
||||
}
|
||||
|
||||
func (ac ActionChannel) String() string {
|
||||
func (a Action) Channel() Channel {
|
||||
ch := Channel{"actions"}
|
||||
|
||||
if len(ac.Contract) > 0 {
|
||||
ch.Append("contract", ac.Contract)
|
||||
if len(a.Contract) > 0 {
|
||||
ch.Append("contract", a.Contract)
|
||||
}
|
||||
|
||||
if len(ac.Action) > 0 {
|
||||
ch.Append("action", ac.Action)
|
||||
if len(a.Action) > 0 {
|
||||
ch.Append("action", a.Action)
|
||||
}
|
||||
|
||||
return ch.String()
|
||||
return ch
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,27 +44,19 @@ func TestChannel_String(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestActionChannel_String(t *testing.T) {
|
||||
type fields struct {
|
||||
Contract string
|
||||
Action string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
want string
|
||||
name string
|
||||
ch Channel
|
||||
want string
|
||||
}{
|
||||
{"Empty", fields{}, "actions"},
|
||||
{"Contract", fields{Contract: "mycontract"}, "actions/contract/mycontract"},
|
||||
{"Action", fields{Action: "myaction"}, "actions/action/myaction"},
|
||||
{"ContractAction", fields{Contract: "mycontract", Action: "myaction"}, "actions/contract/mycontract/action/myaction"},
|
||||
{"Empty", Action{}.Channel(), "actions"},
|
||||
{"Contract", Action{Contract: "mycontract"}.Channel(), "actions/contract/mycontract"},
|
||||
{"Action", Action{Action: "myaction"}.Channel(), "actions/action/myaction"},
|
||||
{"ContractAndAction", Action{Contract: "mycontract", Action: "myaction"}.Channel(), "actions/contract/mycontract/action/myaction"},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ac := ActionChannel{
|
||||
Contract: tt.fields.Contract,
|
||||
Action: tt.fields.Action,
|
||||
}
|
||||
if got := ac.String(); got != tt.want {
|
||||
if got := tt.ch.String(); got != tt.want {
|
||||
t.Errorf("ActionChannel.String() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
|
|
|
|||
|
|
@ -7,5 +7,5 @@ package transport
|
|||
type Reader interface {
|
||||
// Read a message from a channel.
|
||||
// Read may block until a message is ready or an error occured.
|
||||
Read(channel ChannelInterface) ([]byte, error)
|
||||
Read(channel Channel) ([]byte, error)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import (
|
|||
|
||||
type Key struct {
|
||||
NS Namespace
|
||||
Channel transport.ChannelInterface
|
||||
Channel transport.Channel
|
||||
}
|
||||
|
||||
func (k Key) String() string {
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import (
|
|||
func TestKey_String(t *testing.T) {
|
||||
type fields struct {
|
||||
NS Namespace
|
||||
Channel transport.ChannelInterface
|
||||
Channel transport.Channel
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
@ -19,7 +19,7 @@ func TestKey_String(t *testing.T) {
|
|||
{"Empty", fields{NS: Namespace{}, Channel: transport.Channel{}}, "ship::0000000000000000000000000000000000000000000000000000000000000000::"},
|
||||
{"Transactions", fields{NS: Namespace{ChainID: "id"}, Channel: transport.Channel{"transactions"}}, "ship::id::transactions"},
|
||||
{"Nested", fields{NS: Namespace{ChainID: "id"}, Channel: transport.Channel{"one.two"}}, "ship::id::one.two"},
|
||||
{"Action", fields{NS: Namespace{ChainID: "id"}, Channel: transport.ActionChannel{Contract: "mycontract"}}, "ship::id::actions/contract/mycontract"},
|
||||
{"Action", fields{NS: Namespace{ChainID: "id"}, Channel: transport.Action{Contract: "mycontract"}.Channel()}, "ship::id::actions/contract/mycontract"},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ type Namespace struct {
|
|||
}
|
||||
|
||||
// Create a new key with this namespace.
|
||||
func (ns Namespace) NewKey(ch transport.ChannelInterface) Key {
|
||||
func (ns Namespace) NewKey(ch transport.Channel) Key {
|
||||
return Key{NS: ns, Channel: ch}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ func NewPublisher(client *redis.Client, ns Namespace) *Publisher {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *Publisher) Write(channel transport.ChannelInterface, payload []byte) error {
|
||||
func (r *Publisher) Write(channel transport.Channel, payload []byte) error {
|
||||
return r.pipeline.Publish(r.ctx, r.ns.NewKey(channel).String(), payload).Err()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ package transport
|
|||
type Writer interface {
|
||||
// Write writes a message over a channel.
|
||||
// The message may or may not be buffered depending on the implementation.
|
||||
Write(channel ChannelInterface, payload []byte) error
|
||||
Write(channel Channel, payload []byte) error
|
||||
|
||||
// Flush writes any buffered messages to the channel.
|
||||
// If the implementation does not support buffering. this is a noop.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue