1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-16 04:24:56 +02:00
This commit is contained in:
Henrik Hautakoski 2022-11-28 15:22:07 +01:00
parent e2c1922269
commit c8b4c208e6
7 changed files with 137 additions and 11 deletions

View file

@ -24,6 +24,7 @@ type Config struct {
Name string `json:"name"`
ShipApi string `json:"ship_api"`
Api string `json:"api"`
Transport string `json:"transport"`
Redis RedisConfig `json:"redis"`
@ -38,6 +39,7 @@ type Config struct {
func Load(filename string) (Config, error) {
cfg := Config{
Transport: "redis-channel",
StartBlockNum: NULL_BLOCK_NUMBER,
EndBlockNum: NULL_BLOCK_NUMBER,
MaxMessagesInFlight: 10,

View file

@ -13,6 +13,7 @@ import (
"eosio-ship-trace-reader/config"
"eosio-ship-trace-reader/redis"
"eosio-ship-trace-reader/telegram"
"eosio-ship-trace-reader/transport"
eos "github.com/eoscanada/eos-go"
shipclient "github.com/eosswedenorg-go/eos-ship-client"
)
@ -30,6 +31,7 @@ var shClient *shipclient.ShipClient
var eosClient *eos.API
var eosClientCtx = context.Background()
var transporter transport.Driver
// Reader states
const RS_CONNECT = 1
@ -196,6 +198,13 @@ func main() {
return
}
// Create message queue
transporter, err = transport.Make(conf.Transport, chainInfo.ChainID.String())
if err != nil {
log.Println("Failed to create queue:", err)
return
}
redis.Prefix += chainInfo.ChainID.String() + "."
if conf.StartBlockNum == config.NULL_BLOCK_NUMBER {

View file

@ -5,7 +5,6 @@ import (
"log"
"encoding/json"
"github.com/eoscanada/eos-go/ship"
"eosio-ship-trace-reader/redis"
)
var block_num uint32
@ -25,9 +24,8 @@ func processTraces(traces []*ship.TransactionTraceV0) {
payload, err := json.Marshal(trace)
if err == nil {
channel := redis.Key("transactions")
if err := redis.Publish(channel, payload).Err(); err != nil {
log.Printf("Failed to post to channel '%s': %s", channel, err)
if err := transporter.Send("transactions", block_num, payload); err != nil {
log.Println(err)
}
} else {
log.Println("Failed to encode transaction:", err)
@ -62,21 +60,21 @@ func processTraces(traces []*ship.TransactionTraceV0) {
}
channels := []string{
redis.Key("actions"),
redis.Key(string(act.Contract), "actions"),
redis.Key(string(act.Contract), "actions", string(act.Action)),
"actions",
string(act.Contract) + ".actions",
string(act.Contract) + ".actions." + string(act.Action),
}
for _, channel := range channels {
if err := redis.RegisterPublish(channel, payload).Err(); err != nil {
log.Printf("Failed to post to channel '%s': %s", channel, err)
if err := transporter.Send(channel, block_num, payload); err != nil {
log.Println(err)
}
}
}
}
_, err := redis.Send()
err := transporter.Commit()
if err != nil {
log.Println("Failed to send redis. command:", err)
log.Println("Failed to flush queue", err)
}
}

9
transport/driver.go Normal file
View file

@ -0,0 +1,9 @@
package transport
type Driver interface
{
Send(namespace string, id uint32, message interface{}) error
Commit() error
}

16
transport/factory.go Normal file
View file

@ -0,0 +1,16 @@
package transport
import "fmt"
func Make(driver string, name string) (Driver, error) {
switch driver {
case "redis-pubsub":
return NewRedisPubSub(name), nil
case "redis-stream":
return NewRedisStream(name, 1000), nil
default:
return nil, fmt.Errorf("Invalid type: %s", driver)
}
}

35
transport/redis_pubsub.go Normal file
View file

@ -0,0 +1,35 @@
package transport
import (
"fmt"
"strings"
"eosio-ship-trace-reader/redis"
)
type RedisPubSub struct {
name string
}
func NewRedisPubSub(name string) (RedisPubSub) {
return RedisPubSub{
name: name,
}
}
func (this RedisPubSub) Send(namespace string, id uint32, message interface{}) error {
channel := strings.Join([]string{"ship.channel", this.name, namespace}, ".")
if err := redis.RegisterPublish(channel, message).Err(); err != nil {
return fmt.Errorf("Failed to post to channel '%s': %s", channel, err)
}
return nil
}
func (this RedisPubSub) Commit() error {
_, err := redis.Send()
if err != nil {
return fmt.Errorf("Failed to send redis. command: %s", err)
}
return nil
}

57
transport/redis_stream.go Normal file
View file

@ -0,0 +1,57 @@
package transport
import (
"fmt"
"strings"
"eosio-ship-trace-reader/redis"
)
type RedisStream struct {
name string
// Length of the stream, if items are added when the stream is full, old items will be evicted
// until the stream's length is equal to this value.
length int64
// map of namespaces and their indexes.
// each namespace is it's own stream.
indexes map[string]uint32
}
func NewRedisStream(name string, length int64) (RedisStream) {
return RedisStream{
name: name,
length: length,
indexes: make(map[string]uint32),
}
}
func (this RedisStream) Send(namespace string, id uint32, message interface{}) error {
stream := strings.Join([]string{"ship.stream", this.name, namespace}, ".")
index := this.nextIndex(namespace)
data := map[string]interface{}{
"block": id,
"data": message,
}
if err := redis.XAdd(stream, fmt.Sprintf("%d-%d", id, index), this.length, data).Err(); err != nil {
return fmt.Errorf("Failed to add to redis stream '%s': %s", stream, err)
}
return nil
}
func (this RedisStream) Commit() error {
// reset indexes on flush.
this.indexes = make(map[string]uint32)
return nil
}
func (this RedisStream) nextIndex(namespace string) uint32 {
idx := this.indexes[namespace]
this.indexes[namespace] = idx + 1
return idx
}