1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-20 09:56:47 +02:00

Formatting fix.

This commit is contained in:
Henrik Hautakoski 2022-11-28 15:25:21 +01:00
parent 953113b456
commit 1e2dda54c8
9 changed files with 440 additions and 448 deletions

75
abi.go
View file

@ -1,57 +1,56 @@
package main package main
import ( import (
"time" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"encoding/json" "time"
eos "github.com/eoscanada/eos-go"
redis_cache "github.com/go-redis/cache/v8" "eosio-ship-trace-reader/abi_cache"
"eosio-ship-trace-reader/abi_cache" "eosio-ship-trace-reader/redis"
"eosio-ship-trace-reader/redis"
eos "github.com/eoscanada/eos-go"
redis_cache "github.com/go-redis/cache/v8"
) )
var abiCache *abi_cache.Cache var abiCache *abi_cache.Cache
func InitAbiCache(id string) { func InitAbiCache(id string) {
// Init abi cache // Init abi cache
abiCache = abi_cache.New("ship.cache." + id + ".abi", &redis_cache.Options{ abiCache = abi_cache.New("ship.cache."+id+".abi", &redis_cache.Options{
Redis: redis.Client(), Redis: redis.Client(),
// Cache 10k keys for 10 minutes. // Cache 10k keys for 10 minutes.
LocalCache: redis_cache.NewTinyLFU(10000, 10 * time.Minute), LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute),
}) })
} }
func GetAbi(account eos.AccountName) (*eos.ABI, error) { func GetAbi(account eos.AccountName) (*eos.ABI, error) {
key := string(account)
key := string(account) abi, err := abiCache.Get(key)
if err != nil {
resp, err := eosClient.GetABI(eosClientCtx, account)
if err != nil {
return nil, errors.New(fmt.Sprintf("api: %s", err))
}
abi = &resp.ABI
abi, err := abiCache.Get(key) err = abiCache.Set(key, abi, time.Hour)
if err != nil { if err != nil {
resp, err := eosClient.GetABI(eosClientCtx, account) return nil, errors.New(fmt.Sprintf("cache: %s", err))
if err != nil { }
return nil, errors.New(fmt.Sprintf("api: %s", err)) }
} return abi, nil
abi = &resp.ABI
err = abiCache.Set(key, abi, time.Hour)
if err != nil {
return nil, errors.New(fmt.Sprintf("cache: %s", err))
}
}
return abi, nil
} }
func DecodeAction(abi *eos.ABI, data []byte, actionName eos.ActionName) (interface{}, error) { func DecodeAction(abi *eos.ABI, data []byte, actionName eos.ActionName) (interface{}, error) {
var v interface{}
var v interface{} bytes, err := abi.DecodeAction(data, actionName)
if err != nil {
return v, err
}
bytes, err := abi.DecodeAction(data, actionName) err = json.Unmarshal(bytes, &v)
if err != nil { return v, err
return v, err
}
err = json.Unmarshal(bytes, &v)
return v, err
} }

View file

@ -1,42 +1,42 @@
package abi_cache package abi_cache
import ( import (
"time" "context"
"context" "time"
redis_cache "github.com/go-redis/cache/v8"
eos "github.com/eoscanada/eos-go" eos "github.com/eoscanada/eos-go"
redis_cache "github.com/go-redis/cache/v8"
) )
type Cache struct { type Cache struct {
c *redis_cache.Cache c *redis_cache.Cache
ctx context.Context ctx context.Context
prefix string prefix string
} }
func New(prefix string, options *redis_cache.Options) (*Cache) { func New(prefix string, options *redis_cache.Options) *Cache {
return &Cache{ return &Cache{
c: redis_cache.New(options), c: redis_cache.New(options),
ctx: context.Background(), ctx: context.Background(),
prefix: prefix, prefix: prefix,
} }
} }
func (this *Cache) Get(account string) (*eos.ABI, error) { func (this *Cache) Get(account string) (*eos.ABI, error) {
var v eos.ABI var v eos.ABI
err := this.c.Get(this.ctx, this.key(account), &v); err := this.c.Get(this.ctx, this.key(account), &v)
return &v, err return &v, err
} }
func (this *Cache) Set(account string, abi *eos.ABI, ttl time.Duration) error { func (this *Cache) Set(account string, abi *eos.ABI, ttl time.Duration) error {
return this.c.Set(&redis_cache.Item{ return this.c.Set(&redis_cache.Item{
Ctx: this.ctx, Ctx: this.ctx,
Key: this.key(account), Key: this.key(account),
Value: *abi, Value: *abi,
TTL: ttl, TTL: ttl,
}) })
} }
func (this *Cache) key(account string) (string) { func (this *Cache) key(account string) string {
return this.prefix + "." + account return this.prefix + "." + account
} }

View file

@ -1,16 +1,16 @@
package abi_cache package abi_cache
import ( import (
"time" "strings"
"strings" "testing"
"github.com/go-redis/redis/v8" "time"
redis_cache "github.com/go-redis/cache/v8"
eos "github.com/eoscanada/eos-go"
"testing" eos "github.com/eoscanada/eos-go"
"github.com/stretchr/testify/assert" redis_cache "github.com/go-redis/cache/v8"
"github.com/stretchr/testify/require" "github.com/go-redis/redis/v8"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
var abiString = ` var abiString = `
@ -74,86 +74,84 @@ var abiString = `
` `
func TestGetSet(t *testing.T) { func TestGetSet(t *testing.T) {
c := New("abi.cache.test", &redis_cache.Options{
Redis: redis.NewClient(&redis.Options{}),
// Cache 10k keys for 1 minute.
LocalCache: redis_cache.NewTinyLFU(10000, time.Minute),
})
c := New("abi.cache.test", &redis_cache.Options{ abi, err := eos.NewABI(strings.NewReader(abiString))
Redis: redis.NewClient(&redis.Options{}), if err != nil {
// Cache 10k keys for 1 minute. t.Error("Failed to build ABI", err)
LocalCache: redis_cache.NewTinyLFU(10000, time.Minute), }
})
abi, err := eos.NewABI(strings.NewReader(abiString)) err = c.Set("testaccount", abi, time.Minute)
if err != nil { if err != nil {
t.Error("Failed to build ABI", err) t.Error("Failed to set cache item", err)
} }
err = c.Set("testaccount", abi, time.Minute) c_abi, err := c.Get("testaccount")
if err != nil { if err != nil {
t.Error("Failed to set cache item", err) t.Error("Failed to get cache item", err)
} }
c_abi, err := c.Get("testaccount") assert.Equal(t, c_abi.Version, "eosio::abi/1.0")
if err != nil {
t.Error("Failed to get cache item", err)
}
assert.Equal(t, c_abi.Version, "eosio::abi/1.0") // Types
assert.Equal(t, c_abi.Types[0].NewTypeName, "new_type_name_1")
assert.Equal(t, c_abi.Types[0].Type, "name")
// Types // Structs
assert.Equal(t, c_abi.Types[0].NewTypeName, "new_type_name_1") assert.Equal(t, c_abi.Structs[0].Name, "struct_name_1")
assert.Equal(t, c_abi.Types[0].Type, "name") assert.Equal(t, c_abi.Structs[0].Base, "struct_name_2")
assert.Equal(t, c_abi.Structs[0].Fields[0].Name, "struct_1_field_1")
assert.Equal(t, c_abi.Structs[0].Fields[0].Type, "new_type_name_1")
assert.Equal(t, c_abi.Structs[0].Fields[1].Name, "struct_1_field_2")
assert.Equal(t, c_abi.Structs[0].Fields[1].Type, "struct_name_3")
assert.Equal(t, c_abi.Structs[0].Fields[2].Name, "struct_1_field_3")
assert.Equal(t, c_abi.Structs[0].Fields[2].Type, "string?")
assert.Equal(t, c_abi.Structs[0].Fields[3].Name, "struct_1_field_4")
assert.Equal(t, c_abi.Structs[0].Fields[3].Type, "string?")
assert.Equal(t, c_abi.Structs[0].Fields[4].Name, "struct_1_field_5")
assert.Equal(t, c_abi.Structs[0].Fields[4].Type, "struct_name_4[]")
// Structs assert.Equal(t, c_abi.Structs[1].Name, "struct_name_2")
assert.Equal(t, c_abi.Structs[0].Name, "struct_name_1") assert.Equal(t, c_abi.Structs[1].Base, "")
assert.Equal(t, c_abi.Structs[0].Base, "struct_name_2") assert.Equal(t, c_abi.Structs[1].Fields[0].Name, "struct_2_field_1")
assert.Equal(t, c_abi.Structs[0].Fields[0].Name, "struct_1_field_1") assert.Equal(t, c_abi.Structs[1].Fields[0].Type, "string")
assert.Equal(t, c_abi.Structs[0].Fields[0].Type, "new_type_name_1")
assert.Equal(t, c_abi.Structs[0].Fields[1].Name, "struct_1_field_2")
assert.Equal(t, c_abi.Structs[0].Fields[1].Type, "struct_name_3")
assert.Equal(t, c_abi.Structs[0].Fields[2].Name, "struct_1_field_3")
assert.Equal(t, c_abi.Structs[0].Fields[2].Type, "string?")
assert.Equal(t, c_abi.Structs[0].Fields[3].Name, "struct_1_field_4")
assert.Equal(t, c_abi.Structs[0].Fields[3].Type, "string?")
assert.Equal(t, c_abi.Structs[0].Fields[4].Name, "struct_1_field_5")
assert.Equal(t, c_abi.Structs[0].Fields[4].Type, "struct_name_4[]")
assert.Equal(t, c_abi.Structs[1].Name, "struct_name_2") assert.Equal(t, c_abi.Structs[2].Name, "struct_name_3")
assert.Equal(t, c_abi.Structs[1].Base, "") assert.Equal(t, c_abi.Structs[2].Base, "")
assert.Equal(t, c_abi.Structs[1].Fields[0].Name, "struct_2_field_1") assert.Equal(t, c_abi.Structs[2].Fields[0].Name, "struct_3_field_1")
assert.Equal(t, c_abi.Structs[1].Fields[0].Type, "string") assert.Equal(t, c_abi.Structs[2].Fields[0].Type, "string")
assert.Equal(t, c_abi.Structs[2].Name, "struct_name_3") assert.Equal(t, c_abi.Structs[3].Name, "struct_name_4")
assert.Equal(t, c_abi.Structs[2].Base, "") assert.Equal(t, c_abi.Structs[3].Base, "")
assert.Equal(t, c_abi.Structs[2].Fields[0].Name, "struct_3_field_1") assert.Equal(t, c_abi.Structs[3].Fields[0].Name, "struct_4_field_1")
assert.Equal(t, c_abi.Structs[2].Fields[0].Type, "string") assert.Equal(t, c_abi.Structs[3].Fields[0].Type, "string")
assert.Equal(t, c_abi.Structs[3].Name, "struct_name_4") // Actions
assert.Equal(t, c_abi.Structs[3].Base, "") assert.Equal(t, c_abi.Actions[0].Name, eos.ActN("action_name_1"))
assert.Equal(t, c_abi.Structs[3].Fields[0].Name, "struct_4_field_1") assert.Equal(t, c_abi.Actions[0].Type, "struct_name_1")
assert.Equal(t, c_abi.Structs[3].Fields[0].Type, "string") assert.Equal(t, c_abi.Actions[0].RicardianContract, "")
// Actions // Tables
assert.Equal(t, c_abi.Actions[0].Name, eos.ActN("action_name_1")) assert.Equal(t, c_abi.Tables[0].Name, eos.TableName("table_name_1"))
assert.Equal(t, c_abi.Actions[0].Type, "struct_name_1") assert.Equal(t, c_abi.Tables[0].Type, "struct_name_1")
assert.Equal(t, c_abi.Actions[0].RicardianContract, "") assert.Equal(t, c_abi.Tables[0].IndexType, "i64")
assert.Equal(t, c_abi.Tables[0].KeyNames[0], "key_name_1")
// Tables assert.Equal(t, c_abi.Tables[0].KeyNames[1], "key_name_2")
assert.Equal(t, c_abi.Tables[0].Name, eos.TableName("table_name_1")) assert.Equal(t, c_abi.Tables[0].KeyTypes[0], "string")
assert.Equal(t, c_abi.Tables[0].Type, "struct_name_1") assert.Equal(t, c_abi.Tables[0].KeyTypes[1], "int")
assert.Equal(t, c_abi.Tables[0].IndexType, "i64")
assert.Equal(t, c_abi.Tables[0].KeyNames[0], "key_name_1")
assert.Equal(t, c_abi.Tables[0].KeyNames[1], "key_name_2")
assert.Equal(t, c_abi.Tables[0].KeyTypes[0], "string")
assert.Equal(t, c_abi.Tables[0].KeyTypes[1], "int")
} }
func TestCacheMiss(t *testing.T) { func TestCacheMiss(t *testing.T) {
c := New("abi.cache.test", &redis_cache.Options{
Redis: redis.NewClient(&redis.Options{}),
// Cache 10k keys for 1 minute.
LocalCache: redis_cache.NewTinyLFU(10000, time.Minute),
})
c := New("abi.cache.test", &redis_cache.Options{ _, err := c.Get("nonexist")
Redis: redis.NewClient(&redis.Options{}), require.Error(t, err)
// Cache 10k keys for 1 minute.
LocalCache: redis_cache.NewTinyLFU(10000, time.Minute),
})
_, err := c.Get("nonexist")
require.Error(t, err)
} }

View file

@ -1,63 +1,61 @@
package config package config
import ( import (
"io/ioutil" "encoding/json"
"encoding/json" "io/ioutil"
) )
const NULL_BLOCK_NUMBER uint32 = 0xffffffff const NULL_BLOCK_NUMBER uint32 = 0xffffffff
type RedisConfig struct { type RedisConfig struct {
Addr string `json:"addr"` Addr string `json:"addr"`
Password string `json:"password"` Password string `json:"password"`
DB int `json:db` DB int `json:db`
CacheID string `json:"cache_id"` CacheID string `json:"cache_id"`
} }
type TelegramConfig struct { type TelegramConfig struct {
Id string `json:"id"` Id string `json:"id"`
Channel int64 `json:"channel"` Channel int64 `json:"channel"`
} }
type Config struct { type Config struct {
Name string `json:"name"` Name string `json:"name"`
ShipApi string `json:"ship_api"` ShipApi string `json:"ship_api"`
Api string `json:"api"` Api string `json:"api"`
Redis RedisConfig `json:"redis"` Redis RedisConfig `json:"redis"`
Telegram TelegramConfig `json:"telegram"` Telegram TelegramConfig `json:"telegram"`
IrreversibleOnly bool `json:"irreversible_only"` IrreversibleOnly bool `json:"irreversible_only"`
MaxMessagesInFlight uint32 `json:"max_messages_in_flight"` MaxMessagesInFlight uint32 `json:"max_messages_in_flight"`
StartBlockNum uint32 `json:"start_block_num"` StartBlockNum uint32 `json:"start_block_num"`
EndBlockNum uint32 `json:"end_block_num"` EndBlockNum uint32 `json:"end_block_num"`
} }
func Load(filename string) (Config, error) { func Load(filename string) (Config, error) {
cfg := Config{
StartBlockNum: NULL_BLOCK_NUMBER,
EndBlockNum: NULL_BLOCK_NUMBER,
MaxMessagesInFlight: 10,
IrreversibleOnly: false,
Redis: RedisConfig{
Addr: "localhost:6379",
Password: "",
DB: 0,
},
}
cfg := Config{ bytes, err := ioutil.ReadFile(filename)
StartBlockNum: NULL_BLOCK_NUMBER, if err != nil {
EndBlockNum: NULL_BLOCK_NUMBER, return cfg, err
MaxMessagesInFlight: 10, }
IrreversibleOnly: false,
Redis: RedisConfig{
Addr: "localhost:6379",
Password: "",
DB: 0,
},
}
bytes, err := ioutil.ReadFile(filename) err = json.Unmarshal(bytes, &cfg)
if err != nil { if err != nil {
return cfg, err return cfg, err
} }
err = json.Unmarshal(bytes, &cfg) return cfg, nil
if err != nil {
return cfg, err
}
return cfg, nil
} }

322
main.go
View file

@ -1,20 +1,21 @@
package main package main
import ( import (
"fmt" "context"
"os" "fmt"
"os/signal" "log"
"context" "os"
"log" "os/signal"
"time" "time"
"github.com/pborman/getopt/v2"
"github.com/eosswedenorg-go/pid" "eosio-ship-trace-reader/config"
"eosio-ship-trace-reader/config" "eosio-ship-trace-reader/redis"
"eosio-ship-trace-reader/redis" "eosio-ship-trace-reader/telegram"
"eosio-ship-trace-reader/telegram"
eos "github.com/eoscanada/eos-go" eos "github.com/eoscanada/eos-go"
shipclient "github.com/eosswedenorg-go/eos-ship-client" shipclient "github.com/eosswedenorg-go/eos-ship-client"
"github.com/eosswedenorg-go/pid"
"github.com/pborman/getopt/v2"
) )
// --------------------------- // ---------------------------
@ -27,191 +28,190 @@ var chainInfo *eos.InfoResp
var shClient *shipclient.ShipClient var shClient *shipclient.ShipClient
var eosClient *eos.API var (
var eosClientCtx = context.Background() eosClient *eos.API
eosClientCtx = context.Background()
)
// Reader states // Reader states
const RS_CONNECT = 1 const RS_CONNECT = 1
const RS_READ = 2 const RS_READ = 2
func readerLoop() { func readerLoop() {
state := RS_CONNECT
var recon_cnt uint = 0
state := RS_CONNECT for {
var recon_cnt uint = 0 switch state {
case RS_CONNECT:
recon_cnt++
log.Printf("Connecting to ship at: %s (Try %d)", conf.ShipApi, recon_cnt)
err := shClient.Connect(conf.ShipApi)
if err != nil {
log.Println(err)
for { if recon_cnt >= 3 {
switch state { msg := fmt.Sprintf("Failed to connect to ship at '%s'", conf.ShipApi)
case RS_CONNECT : if err = telegram.Send(msg); err != nil {
recon_cnt++ log.Println(err)
log.Printf("Connecting to ship at: %s (Try %d)", conf.ShipApi, recon_cnt) }
err := shClient.Connect(conf.ShipApi) recon_cnt = 0
if err != nil { }
log.Println(err)
if recon_cnt >= 3 { log.Printf("Trying again in 5 seconds ....")
msg := fmt.Sprintf("Failed to connect to ship at '%s'", conf.ShipApi) time.Sleep(5 * time.Second)
if err = telegram.Send(msg); err != nil { break
log.Println(err) }
}
recon_cnt = 0
}
log.Printf("Trying again in 5 seconds ....") err = shClient.SendBlocksRequest()
time.Sleep(5 * time.Second) if err != nil {
break; log.Println(err)
} break
}
err = shClient.SendBlocksRequest() // Connected
if err != nil { log.Printf("Connected, Start: %d, End: %d", shClient.StartBlock, shClient.EndBlock)
log.Println(err) state = RS_READ
break recon_cnt = 0
} case RS_READ:
err := shClient.Read()
if err != nil {
log.Print(err.Error())
// Connected // Reconnect
log.Printf("Connected, Start: %d, End: %d", shClient.StartBlock, shClient.EndBlock) if err.Type == shipclient.ErrSockRead {
state = RS_READ state = RS_CONNECT
recon_cnt = 0 }
case RS_READ : }
err := shClient.Read() }
if err != nil { }
log.Print(err.Error())
// Reconnect shClient.Close()
if err.Type == shipclient.ErrSockRead {
state = RS_CONNECT
}
}
}
}
shClient.Close()
} }
func run() { func run() {
// Create done and interrupt channels.
done := make(chan bool)
interrupt := make(chan os.Signal, 1)
// Create done and interrupt channels. // Register interrupt channel to receive interrupt messages
done := make(chan bool) signal.Notify(interrupt, os.Interrupt)
interrupt := make(chan os.Signal, 1)
// Register interrupt channel to receive interrupt messages // Spawn message read loop in another thread.
signal.Notify(interrupt, os.Interrupt) go func() {
readerLoop()
// Spawn message read loop in another thread. // Reader exited. signal that we are done.
go func() { done <- true
readerLoop() }()
// Reader exited. signal that we are done. // Enter event loop in main thread
done <- true for {
}() select {
case <-interrupt:
log.Println("Interrupt, closing")
// Enter event loop in main thread if shClient.IsOpen() == false {
for { log.Println("ship client not connected, exiting...")
select { return
case <-interrupt: }
log.Println("Interrupt, closing")
if shClient.IsOpen() == false { // Cleanly close the connection by sending a close message and then
log.Println("ship client not connected, exiting...") // waiting (with timeout) for the server to close the connection.
return shClient.SendCloseMessage()
}
// Cleanly close the connection by sending a close message and then select {
// waiting (with timeout) for the server to close the connection. case <-done:
shClient.SendCloseMessage() log.Println("Closed")
case <-time.After(time.Second * 10):
select { log.Println("Timeout")
case <-done: log.Println("Closed") }
case <-time.After(time.Second * 10): log.Println("Timeout"); return
} case <-done:
return log.Println("Closed")
case <-done: return
log.Println("Closed") }
return }
}
}
} }
func main() { func main() {
var err error
var err error showHelp := getopt.BoolLong("help", 'h', "display this help text")
showVersion := getopt.BoolLong("version", 'v', "display this help text")
configFile := getopt.StringLong("config", 'c', "./config.json", "Config file to read", "file")
pidFile := getopt.StringLong("pid", 'p', "", "Where to write process id", "file")
showHelp := getopt.BoolLong("help", 'h', "display this help text") getopt.Parse()
showVersion := getopt.BoolLong("version", 'v', "display this help text")
configFile := getopt.StringLong("config", 'c', "./config.json", "Config file to read", "file")
pidFile := getopt.StringLong("pid", 'p', "", "Where to write process id", "file")
getopt.Parse() if *showHelp {
getopt.Usage()
return
}
if *showHelp { if *showVersion {
getopt.Usage() fmt.Println("v0.0.0")
return return
} }
if *showVersion { // Write PID file
fmt.Println("v0.0.0") if len(*pidFile) > 0 {
return log.Printf("Writing pid to: %s", *pidFile)
} err = pid.Save(*pidFile)
if err != nil {
log.Println(err)
return
}
}
// Write PID file // Parse config
if len(*pidFile) > 0 { conf, err = config.Load(*configFile)
log.Printf("Writing pid to: %s", *pidFile) if err != nil {
err = pid.Save(*pidFile) log.Println(err)
if err != nil { return
log.Println(err) }
return
}
}
// Parse config // Init telegram
conf, err = config.Load(*configFile) err = telegram.Init(conf.Name, conf.Telegram.Id, conf.Telegram.Channel)
if err != nil { if err != nil {
log.Println(err) log.Println("Failed to initialize telegram", err)
return return
} }
// Init telegram // Connect to redis
err = telegram.Init(conf.Name, conf.Telegram.Id, conf.Telegram.Channel) err = redis.Connect(conf.Redis.Addr, conf.Redis.Password, conf.Redis.DB)
if err != nil { if err != nil {
log.Println("Failed to initialize telegram", err) log.Println("Failed to connect to redis:", err)
return return
} }
// Connect to redis // Init Abi cache
err = redis.Connect(conf.Redis.Addr, conf.Redis.Password, conf.Redis.DB) InitAbiCache(conf.Redis.CacheID)
if err != nil {
log.Println("Failed to connect to redis:", err)
return
}
// Init Abi cache // Connect client and get chain info.
InitAbiCache(conf.Redis.CacheID) log.Printf("Get chain info from api at: %s", conf.Api)
eosClient = eos.New(conf.Api)
chainInfo, err = eosClient.GetInfo(eosClientCtx)
if err != nil {
log.Println("Failed to get info:", err)
return
}
// Connect client and get chain info. redis.Prefix += chainInfo.ChainID.String() + "."
log.Printf("Get chain info from api at: %s", conf.Api)
eosClient = eos.New(conf.Api)
chainInfo, err = eosClient.GetInfo(eosClientCtx)
if err != nil {
log.Println("Failed to get info:", err)
return
}
redis.Prefix += chainInfo.ChainID.String() + "." if conf.StartBlockNum == config.NULL_BLOCK_NUMBER {
if conf.IrreversibleOnly {
conf.StartBlockNum = uint32(chainInfo.LastIrreversibleBlockNum)
} else {
conf.StartBlockNum = uint32(chainInfo.HeadBlockNum)
}
}
if conf.StartBlockNum == config.NULL_BLOCK_NUMBER { // Construct ship client
shClient = shipclient.NewClient(conf.StartBlockNum, conf.EndBlockNum, conf.IrreversibleOnly)
shClient.BlockHandler = processBlock
shClient.TraceHandler = processTraces
if conf.IrreversibleOnly { // Run the application
conf.StartBlockNum = uint32(chainInfo.LastIrreversibleBlockNum) run()
} else {
conf.StartBlockNum = uint32(chainInfo.HeadBlockNum)
}
}
// Construct ship client
shClient = shipclient.NewClient(conf.StartBlockNum, conf.EndBlockNum, conf.IrreversibleOnly)
shClient.BlockHandler = processBlock
shClient.TraceHandler = processTraces
// Run the application
run()
} }

View file

@ -1,11 +1,11 @@
package redis package redis
import ( import (
"strings" "context"
"context" "strings"
"time" "time"
_redis "github.com/go-redis/redis/v8"
_redis "github.com/go-redis/redis/v8"
) )
var rdb *_redis.Client var rdb *_redis.Client
@ -17,41 +17,41 @@ var redisCtx = context.Background()
var Prefix = "ship." var Prefix = "ship."
func Connect(addr string, password string, db int) error { func Connect(addr string, password string, db int) error {
rdb = _redis.NewClient(&_redis.Options{ rdb = _redis.NewClient(&_redis.Options{
Addr: addr, Addr: addr,
Password: password, Password: password,
DB: db, DB: db,
}) })
redis_pipe = rdb.Pipeline() redis_pipe = rdb.Pipeline()
return rdb.Ping(redisCtx).Err() return rdb.Ping(redisCtx).Err()
} }
func Client() *_redis.Client { func Client() *_redis.Client {
return rdb return rdb
} }
func Key(components ...string) (string) { func Key(components ...string) string {
return Prefix + strings.Join(components, ".") return Prefix + strings.Join(components, ".")
} }
func Get(key string) (*_redis.StringCmd) { func Get(key string) *_redis.StringCmd {
return rdb.Get(redisCtx, key) return rdb.Get(redisCtx, key)
} }
func Set(key string, value interface{}, expiration time.Duration) (*_redis.StatusCmd) { func Set(key string, value interface{}, expiration time.Duration) *_redis.StatusCmd {
return rdb.Set(redisCtx, key, value, expiration) return rdb.Set(redisCtx, key, value, expiration)
} }
func Publish(channel string, message interface{}) (*_redis.IntCmd) { func Publish(channel string, message interface{}) *_redis.IntCmd {
return rdb.Publish(redisCtx, channel, message) return rdb.Publish(redisCtx, channel, message)
} }
func RegisterPublish(channel string, message interface{}) (*_redis.IntCmd) { func RegisterPublish(channel string, message interface{}) *_redis.IntCmd {
return redis_pipe.Publish(redisCtx, channel, message) return redis_pipe.Publish(redisCtx, channel, message)
} }
func Send() ([]_redis.Cmder, error) { func Send() ([]_redis.Cmder, error) {
return redis_pipe.Exec(redisCtx) return redis_pipe.Exec(redisCtx)
} }

View file

@ -1,78 +1,75 @@
package main package main
import ( import (
"log" "encoding/json"
"encoding/json" "log"
"github.com/eoscanada/eos-go/ship"
"eosio-ship-trace-reader/redis" "eosio-ship-trace-reader/redis"
"github.com/eoscanada/eos-go/ship"
) )
func processBlock(block *ship.GetBlocksResultV0) { func processBlock(block *ship.GetBlocksResultV0) {
if block.ThisBlock.BlockNum%100 == 0 {
if block.ThisBlock.BlockNum % 100 == 0 { log.Printf("Current: %d, Head: %d\n", block.ThisBlock.BlockNum, block.Head.BlockNum)
log.Printf("Current: %d, Head: %d\n", block.ThisBlock.BlockNum, block.Head.BlockNum) }
}
} }
func processTraces(traces []*ship.TransactionTraceV0) { func processTraces(traces []*ship.TransactionTraceV0) {
for _, trace := range traces {
for _, trace := range traces { payload, err := json.Marshal(trace)
if err == nil {
channel := redis.Key("transactions")
if err := redis.Publish(channel, payload).Err(); err != nil {
log.Printf("Failed to post to channel '%s': %s", channel, err)
}
} else {
log.Println("Failed to encode transaction:", err)
}
payload, err := json.Marshal(trace) // Actions
if err == nil { for _, actionTraceVar := range trace.ActionTraces {
channel := redis.Key("transactions") trace := actionTraceVar.Impl.(*ship.ActionTraceV0)
if err := redis.Publish(channel, payload).Err(); err != nil {
log.Printf("Failed to post to channel '%s': %s", channel, err)
}
} else {
log.Println("Failed to encode transaction:", err)
}
// Actions act := ActionTrace{
for _, actionTraceVar := range trace.ActionTraces { Receiver: trace.Receiver,
trace := actionTraceVar.Impl.(*ship.ActionTraceV0) Contract: trace.Act.Account,
Action: trace.Act.Name,
}
act := ActionTrace{ abi, err := GetAbi(trace.Act.Account)
Receiver: trace.Receiver, if err == nil {
Contract: trace.Act.Account, v, err := DecodeAction(abi, trace.Act.Data, trace.Act.Name)
Action: trace.Act.Name, if err != nil {
} log.Print(err)
}
act.Data = v
} else {
log.Printf("Failed to get abi for contract %s: %s\n", trace.Act.Account, err)
}
abi, err := GetAbi(trace.Act.Account) payload, err := json.Marshal(act)
if err == nil { if err != nil {
v, err := DecodeAction(abi, trace.Act.Data, trace.Act.Name) log.Println("Failed to encode action:", err)
if err != nil { continue
log.Print(err) }
}
act.Data = v
} else {
log.Printf("Failed to get abi for contract %s: %s\n", trace.Act.Account, err)
}
channels := []string{
redis.Key("actions"),
redis.Key(string(act.Contract), "actions"),
redis.Key(string(act.Contract), "actions", string(act.Action)),
}
payload, err := json.Marshal(act) for _, channel := range channels {
if err != nil { if err := redis.RegisterPublish(channel, payload).Err(); err != nil {
log.Println("Failed to encode action:", err) log.Printf("Failed to post to channel '%s': %s", channel, err)
continue }
} }
}
}
channels := []string{ _, err := redis.Send()
redis.Key("actions"), if err != nil {
redis.Key(string(act.Contract), "actions"), log.Println("Failed to send redis. command:", err)
redis.Key(string(act.Contract), "actions", string(act.Action)), }
}
for _, channel := range channels {
if err := redis.RegisterPublish(channel, payload).Err(); err != nil {
log.Printf("Failed to post to channel '%s': %s", channel, err)
}
}
}
}
_, err := redis.Send()
if err != nil {
log.Println("Failed to send redis. command:", err)
}
} }

View file

@ -1,29 +1,30 @@
package telegram package telegram
import ( import (
"fmt" "fmt"
_api "github.com/go-telegram-bot-api/telegram-bot-api/v5"
_api "github.com/go-telegram-bot-api/telegram-bot-api/v5"
) )
var _bot *_api.BotAPI var (
var _channel int64 _bot *_api.BotAPI
var _prefix string _channel int64
_prefix string
)
func Init(prefix string, id string, channel int64) (error) { func Init(prefix string, id string, channel int64) error {
var err error
var err error _bot, err = _api.NewBotAPI(id)
if err == nil {
_bot, err = _api.NewBotAPI(id) _prefix = prefix
if err == nil { _channel = channel
_prefix = prefix }
_channel = channel return err
}
return err
} }
func Send(message string) (error) { func Send(message string) error {
msg := _api.NewMessage(_channel, fmt.Sprintf("%s: %s", _prefix, message)) msg := _api.NewMessage(_channel, fmt.Sprintf("%s: %s", _prefix, message))
_, err := _bot.Send(msg); _, err := _bot.Send(msg)
return err return err
} }

View file

@ -1,13 +1,12 @@
package main package main
import ( import (
eos "github.com/eoscanada/eos-go" eos "github.com/eoscanada/eos-go"
) )
type ActionTrace struct { type ActionTrace struct {
Receiver eos.Name `json:"receiver"` Receiver eos.Name `json:"receiver"`
Contract eos.AccountName `json:"contract"` Contract eos.AccountName `json:"contract"`
Action eos.ActionName `json:"action"` Action eos.ActionName `json:"action"`
Data interface{} `json:"data"` Data interface{} `json:"data"`
} }