diff --git a/transport/channel.go b/api/channel.go similarity index 98% rename from transport/channel.go rename to api/channel.go index 353d0c1..9167120 100644 --- a/transport/channel.go +++ b/api/channel.go @@ -1,4 +1,4 @@ -package transport +package api import ( "strings" diff --git a/transport/channel_test.go b/api/channel_test.go similarity index 99% rename from transport/channel_test.go rename to api/channel_test.go index 4b606b9..5d8038c 100644 --- a/transport/channel_test.go +++ b/api/channel_test.go @@ -1,4 +1,4 @@ -package transport +package api import ( "reflect" diff --git a/transport/message/encoding.go b/api/message/encoding.go similarity index 100% rename from transport/message/encoding.go rename to api/message/encoding.go diff --git a/transport/message/types.go b/api/message/types.go similarity index 100% rename from transport/message/types.go rename to api/message/types.go diff --git a/transport/reader.go b/api/reader.go similarity index 96% rename from transport/reader.go rename to api/reader.go index 0dcab66..a3bfa86 100644 --- a/transport/reader.go +++ b/api/reader.go @@ -1,4 +1,4 @@ -package transport +package api // Reader interface defines the required method // to read a message from an channel. diff --git a/transport/redis_common/key.go b/api/redis_common/key.go similarity index 86% rename from transport/redis_common/key.go rename to api/redis_common/key.go index 3be1896..f3b20de 100644 --- a/transport/redis_common/key.go +++ b/api/redis_common/key.go @@ -3,7 +3,7 @@ package redis_common import ( "fmt" - "thalos/transport" + "thalos/api" ) // Key consists of a namespace and a channel. @@ -11,7 +11,7 @@ import ( type Key struct { NS Namespace - Channel transport.Channel + Channel api.Channel } func (k Key) String() string { diff --git a/transport/redis_common/key_test.go b/api/redis_common/key_test.go similarity index 50% rename from transport/redis_common/key_test.go rename to api/redis_common/key_test.go index 8529246..6c635ad 100644 --- a/transport/redis_common/key_test.go +++ b/api/redis_common/key_test.go @@ -3,23 +3,23 @@ package redis_common import ( "testing" - "thalos/transport" + "thalos/api" ) func TestKey_String(t *testing.T) { type fields struct { NS Namespace - Channel transport.Channel + Channel api.Channel } tests := []struct { name string fields fields want string }{ - {"Empty", fields{NS: Namespace{}, Channel: transport.Channel{}}, "ship::0000000000000000000000000000000000000000000000000000000000000000::"}, - {"Transactions", fields{NS: Namespace{ChainID: "id"}, Channel: transport.Channel{"transactions"}}, "ship::id::transactions"}, - {"Nested", fields{NS: Namespace{ChainID: "id"}, Channel: transport.Channel{"one.two"}}, "ship::id::one.two"}, - {"Action", fields{NS: Namespace{ChainID: "id"}, Channel: transport.Action{Contract: "mycontract"}.Channel()}, "ship::id::actions/contract/mycontract"}, + {"Empty", fields{NS: Namespace{}, Channel: api.Channel{}}, "ship::0000000000000000000000000000000000000000000000000000000000000000::"}, + {"Transactions", fields{NS: Namespace{ChainID: "id"}, Channel: api.Channel{"transactions"}}, "ship::id::transactions"}, + {"Nested", fields{NS: Namespace{ChainID: "id"}, Channel: api.Channel{"one.two"}}, "ship::id::one.two"}, + {"Action", fields{NS: Namespace{ChainID: "id"}, Channel: api.Action{Contract: "mycontract"}.Channel()}, "ship::id::actions/contract/mycontract"}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/transport/redis_common/namespace.go b/api/redis_common/namespace.go similarity index 93% rename from transport/redis_common/namespace.go rename to api/redis_common/namespace.go index 22d9803..1442d0e 100644 --- a/transport/redis_common/namespace.go +++ b/api/redis_common/namespace.go @@ -3,7 +3,7 @@ package redis_common import ( "strings" - "thalos/transport" + "thalos/api" ) const ( @@ -27,7 +27,7 @@ type Namespace struct { } // Create a new key with this namespace. -func (ns Namespace) NewKey(ch transport.Channel) Key { +func (ns Namespace) NewKey(ch api.Channel) Key { return Key{NS: ns, Channel: ch} } diff --git a/transport/redis_common/namespace_test.go b/api/redis_common/namespace_test.go similarity index 100% rename from transport/redis_common/namespace_test.go rename to api/redis_common/namespace_test.go diff --git a/transport/redis_pubsub/publisher.go b/api/redis_pubsub/publisher.go similarity index 81% rename from transport/redis_pubsub/publisher.go rename to api/redis_pubsub/publisher.go index b4e0864..3f31c99 100644 --- a/transport/redis_pubsub/publisher.go +++ b/api/redis_pubsub/publisher.go @@ -3,8 +3,8 @@ package redis_pubsub import ( "context" - "thalos/transport" - . "thalos/transport/redis_common" + "thalos/api" + . "thalos/api/redis_common" "github.com/go-redis/redis/v8" ) @@ -23,7 +23,7 @@ func NewPublisher(client *redis.Client, ns Namespace) *Publisher { } } -func (r *Publisher) Write(channel transport.Channel, payload []byte) error { +func (r *Publisher) Write(channel api.Channel, payload []byte) error { return r.pipeline.Publish(r.ctx, r.ns.NewKey(channel).String(), payload).Err() } diff --git a/transport/redis_pubsub/publisher_test.go b/api/redis_pubsub/publisher_test.go similarity index 70% rename from transport/redis_pubsub/publisher_test.go rename to api/redis_pubsub/publisher_test.go index d88f434..695ac5f 100644 --- a/transport/redis_pubsub/publisher_test.go +++ b/api/redis_pubsub/publisher_test.go @@ -3,8 +3,8 @@ package redis_pubsub import ( "testing" - "thalos/transport" - . "thalos/transport/redis_common" + "thalos/api" + . "thalos/api/redis_common" "github.com/go-redis/redismock/v8" "github.com/stretchr/testify/assert" @@ -19,8 +19,8 @@ func TestPublisher_Write(t *testing.T) { mock.ExpectPublish("ship::id::test", []byte("some string")).SetVal(0) mock.ExpectPublish("ship::id::test2", []byte("some other string")).SetVal(0) - assert.NoError(t, pub.Write(transport.Channel{"test"}, []byte("some string"))) - assert.NoError(t, pub.Write(transport.Channel{"test2"}, []byte("some other string"))) + assert.NoError(t, pub.Write(api.Channel{"test"}, []byte("some string"))) + assert.NoError(t, pub.Write(api.Channel{"test2"}, []byte("some other string"))) assert.NoError(t, pub.Flush()) assert.NoError(t, mock.ExpectationsWereMet()) diff --git a/transport/redis_pubsub/subscriber.go b/api/redis_pubsub/subscriber.go similarity index 94% rename from transport/redis_pubsub/subscriber.go rename to api/redis_pubsub/subscriber.go index 8fccb9b..48c6537 100644 --- a/transport/redis_pubsub/subscriber.go +++ b/api/redis_pubsub/subscriber.go @@ -5,8 +5,8 @@ import ( "sync" "time" - "thalos/transport" - . "thalos/transport/redis_common" + "thalos/api" + . "thalos/api/redis_common" "github.com/go-redis/redis/v8" ) @@ -71,7 +71,7 @@ func (s *Subscriber) worker() { } } -func (s *Subscriber) Read(channel transport.Channel) ([]byte, error) { +func (s *Subscriber) Read(channel api.Channel) ([]byte, error) { var err error key := s.ns.NewKey(channel).String() diff --git a/transport/redis_pubsub/subscriber_test.go b/api/redis_pubsub/subscriber_test.go similarity index 93% rename from transport/redis_pubsub/subscriber_test.go rename to api/redis_pubsub/subscriber_test.go index 4b6b72f..5e1f07d 100644 --- a/transport/redis_pubsub/subscriber_test.go +++ b/api/redis_pubsub/subscriber_test.go @@ -4,8 +4,8 @@ import ( "testing" "time" - "thalos/transport" - . "thalos/transport/redis_common" + "thalos/api" + . "thalos/api/redis_common" "github.com/alicebob/miniredis/v2" "github.com/go-redis/redis/v8" @@ -50,7 +50,7 @@ func TestSubscriber_Read(t *testing.T) { // Redis pubsub does not guarentee that messages are sent in the correct order. for range expectedMessages { - msg, err := s.Read(transport.Channel{"test"}) + msg, err := s.Read(api.Channel{"test"}) assert.NoError(t, err) assert.Contains(t, expectedMessages, string(msg)) diff --git a/transport/writer.go b/api/writer.go similarity index 96% rename from transport/writer.go rename to api/writer.go index 39baeef..953d7d7 100644 --- a/transport/writer.go +++ b/api/writer.go @@ -1,4 +1,4 @@ -package transport +package api // Writer interface defines the required methods // to send messages over an channel. diff --git a/app/ship_processor.go b/app/ship_processor.go index 6f0da87..04782bf 100644 --- a/app/ship_processor.go +++ b/app/ship_processor.go @@ -5,8 +5,8 @@ import ( "encoding/json" "thalos/abi" - "thalos/transport" - "thalos/transport/message" + "thalos/api" + "thalos/api/message" log "github.com/sirupsen/logrus" @@ -29,12 +29,12 @@ func logDecoratedEncoder(encoder message.Encoder) message.Encoder { type ShipProcessor struct { abi *abi.AbiManager - writer transport.Writer + writer api.Writer shipStream *shipclient.Stream encode message.Encoder } -func SpawnProccessor(shipStream *shipclient.Stream, writer transport.Writer, abi *abi.AbiManager) *ShipProcessor { +func SpawnProccessor(shipStream *shipclient.Stream, writer api.Writer, abi *abi.AbiManager) *ShipProcessor { processor := &ShipProcessor{ abi: abi, writer: writer, @@ -49,7 +49,7 @@ func SpawnProccessor(shipStream *shipclient.Stream, writer transport.Writer, abi return processor } -func (processor *ShipProcessor) queueMessage(channel transport.Channel, payload []byte) bool { +func (processor *ShipProcessor) queueMessage(channel api.Channel, payload []byte) bool { err := processor.writer.Write(channel, payload) if err != nil { log.WithError(err).Errorf("Failed to post to channel '%s'", channel) @@ -58,7 +58,7 @@ func (processor *ShipProcessor) queueMessage(channel transport.Channel, payload return true } -func (processor *ShipProcessor) encodeQueue(channel transport.Channel, v interface{}) bool { +func (processor *ShipProcessor) encodeQueue(channel api.Channel, v interface{}) bool { if payload, err := processor.encode(v); err == nil { return processor.queueMessage(channel, payload) } @@ -77,7 +77,7 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { HeadBlockNum: block.Head.BlockNum, } - processor.encodeQueue(transport.HeartbeatChannel, hb) + processor.encodeQueue(api.HeartbeatChannel, hb) err := processor.writer.Flush() if err != nil { @@ -89,7 +89,7 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { func (processor *ShipProcessor) processTraces(traces []*ship.TransactionTraceV0) { for _, trace := range traces { - processor.encodeQueue(transport.TransactionChannel, trace) + processor.encodeQueue(api.TransactionChannel, trace) // Actions for _, actionTraceVar := range trace.ActionTraces { @@ -139,11 +139,11 @@ func (processor *ShipProcessor) processTraces(traces []*ship.TransactionTraceV0) continue } - channels := []transport.Channel{ - transport.Action{}.Channel(), - transport.Action{Name: act.Name}.Channel(), - transport.Action{Contract: act.Contract}.Channel(), - transport.Action{Name: act.Name, Contract: act.Contract}.Channel(), + channels := []api.Channel{ + api.Action{}.Channel(), + api.Action{Name: act.Name}.Channel(), + api.Action{Contract: act.Contract}.Channel(), + api.Action{Name: act.Name, Contract: act.Contract}.Channel(), } for _, channel := range channels {