1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-17 04:30:03 +02:00

move channel stuffs from internal/redis to transport.

This commit is contained in:
Henrik Hautakoski 2023-01-13 13:38:04 +01:00
parent d0782c3c7c
commit b22118898d
8 changed files with 18 additions and 18 deletions

View file

@ -44,7 +44,7 @@ var abi_mgr *abi.AbiManager
var publisher transport.Publisher
var redisNs redis.Namespace
var redisNs transport.Namespace
// Reader states
const (
@ -240,7 +240,7 @@ func main() {
// Init Abi cache
abi_mgr = abi.NewAbiManager(eosClient, conf.Redis.CacheID)
redisNs = redis.Namespace{
redisNs = transport.Namespace{
Prefix: conf.Redis.Prefix,
ChainID: chainInfo.ChainID.String(),
}

View file

@ -6,7 +6,7 @@ import (
log "github.com/sirupsen/logrus"
"eosio-ship-trace-reader/internal/redis"
"eosio-ship-trace-reader/transport"
"github.com/eoscanada/eos-go"
"github.com/eoscanada/eos-go/ship"
)
@ -34,7 +34,7 @@ func encodeMessage(v interface{}) ([]byte, bool) {
return payload, true
}
func queueMessage(channel redis.ChannelInterface, payload []byte) bool {
func queueMessage(channel transport.ChannelInterface, payload []byte) bool {
key := redisNs.NewKey(channel)
err := publisher.Publish(key.String(), payload)
if err != nil {
@ -44,7 +44,7 @@ func queueMessage(channel redis.ChannelInterface, payload []byte) bool {
return true
}
func encodeQueue(channel redis.ChannelInterface, v interface{}) bool {
func encodeQueue(channel transport.ChannelInterface, v interface{}) bool {
if payload, ok := encodeMessage(v); ok {
if queueMessage(channel, payload) {
return true
@ -65,7 +65,7 @@ func processBlock(block *ship.GetBlocksResultV0) {
HeadBlockNum: block.Head.BlockNum,
}
encodeQueue(redis.HeartbeatChannel, hb)
encodeQueue(transport.HeartbeatChannel, hb)
err := publisher.Flush()
if err != nil {
@ -77,7 +77,7 @@ func processBlock(block *ship.GetBlocksResultV0) {
func processTraces(traces []*ship.TransactionTraceV0) {
for _, trace := range traces {
encodeQueue(redis.TransactionChannel, trace)
encodeQueue(transport.TransactionChannel, trace)
// Actions
for _, actionTraceVar := range trace.ActionTraces {
@ -107,11 +107,11 @@ func processTraces(traces []*ship.TransactionTraceV0) {
continue
}
channels := []redis.ChannelInterface{
redis.ActionChannel{},
redis.ActionChannel{Action: string(act.Action)},
redis.ActionChannel{Contract: string(act.Contract)},
redis.ActionChannel{Action: string(act.Action), Contract: string(act.Contract)},
channels := []transport.ChannelInterface{
transport.ActionChannel{},
transport.ActionChannel{Action: string(act.Action)},
transport.ActionChannel{Contract: string(act.Contract)},
transport.ActionChannel{Action: string(act.Action), Contract: string(act.Contract)},
}
for _, channel := range channels {

View file

@ -1,4 +1,4 @@
package redis
package transport
import (
"strings"

View file

@ -1,4 +1,4 @@
package redis
package transport
import (
"reflect"

View file

@ -1,4 +1,4 @@
package redis
package transport
import (
"fmt"

View file

@ -1,4 +1,4 @@
package redis
package transport
import "testing"

View file

@ -1,4 +1,4 @@
package redis
package transport
import (
"strings"

View file

@ -1,4 +1,4 @@
package redis
package transport
import "testing"