From 074c08967968d1c3d0bf283a544e2ed17d4492a6 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Wed, 8 Mar 2023 15:51:54 +0100 Subject: [PATCH] WIP --- app/service/notifier/notifier.go | 20 ++++++ app/service/redis/client.go | 17 +++++ app/service/shipclient/client.go | 26 ++++++++ app/service/telegram/config.go | 15 +++++ app/ship/client.go | 106 +++++++++++++++++++++++++++++++ main.go | 40 ++++-------- transport/client.go | 77 ++++++++++++++++++++++ 7 files changed, 272 insertions(+), 29 deletions(-) create mode 100644 app/service/notifier/notifier.go create mode 100644 app/service/redis/client.go create mode 100644 app/service/shipclient/client.go create mode 100644 app/ship/client.go create mode 100644 transport/client.go diff --git a/app/service/notifier/notifier.go b/app/service/notifier/notifier.go new file mode 100644 index 0000000..879e09a --- /dev/null +++ b/app/service/notifier/notifier.go @@ -0,0 +1,20 @@ +package notifier + +import ( + "github.com/nikoksr/notify" +) + +type NotifierInit func() (notify.Notifier, error) + +func InitNotifier(initializers ...NotifierInit) error { + for _, init := range initializers { + notifier, err := init() + if err != nil { + return err + } + + notify.UseServices(notifier) + } + + return nil +} diff --git a/app/service/redis/client.go b/app/service/redis/client.go new file mode 100644 index 0000000..8702a5c --- /dev/null +++ b/app/service/redis/client.go @@ -0,0 +1,17 @@ +package redis + +import ( + "context" + + "github.com/go-redis/redis/v8" +) + +func NewClient(cfg Config) (*redis.Client, error) { + rdb := redis.NewClient(&redis.Options{ + Addr: cfg.Addr, + Password: cfg.Password, + DB: cfg.DB, + }) + + return rdb, rdb.Ping(context.Background()).Err() +} diff --git a/app/service/shipclient/client.go b/app/service/shipclient/client.go new file mode 100644 index 0000000..4d9a7c4 --- /dev/null +++ b/app/service/shipclient/client.go @@ -0,0 +1,26 @@ +package shipclient + +import ( + "eosio-ship-trace-reader/config" + + "github.com/eoscanada/eos-go" + shipclient "github.com/eosswedenorg-go/antelope-ship-client" +) + +func NewClient(cfg *config.Config, chain *eos.InfoResp) (*shipclient.Client, error) { + if cfg.StartBlockNum == config.NULL_BLOCK_NUMBER { + if cfg.IrreversibleOnly { + cfg.StartBlockNum = uint32(chain.LastIrreversibleBlockNum) + } else { + cfg.StartBlockNum = uint32(chain.HeadBlockNum) + } + } + + options := func(c *shipclient.Client) { + c.StartBlock = cfg.StartBlockNum + c.EndBlock = cfg.EndBlockNum + c.IrreversibleOnly = cfg.IrreversibleOnly + } + + return shipclient.NewClient(options), nil +} diff --git a/app/service/telegram/config.go b/app/service/telegram/config.go index caba44d..60a3f7f 100644 --- a/app/service/telegram/config.go +++ b/app/service/telegram/config.go @@ -1,6 +1,21 @@ package telegram +import ( + "github.com/nikoksr/notify" + "github.com/nikoksr/notify/service/telegram" +) + type Config struct { Id string `json:"id"` Channel int64 `json:"channel"` } + +func Notifier(conf Config) func() (notify.Notifier, error) { + return func() (notify.Notifier, error) { + tg, err := telegram.New(conf.Id) + if err == nil { + tg.AddReceivers(conf.Channel) + } + return tg, err + } +} diff --git a/app/ship/client.go b/app/ship/client.go new file mode 100644 index 0000000..0d0fe5a --- /dev/null +++ b/app/ship/client.go @@ -0,0 +1,106 @@ +package ship + +import ( + "context" + "fmt" + "time" + + "github.com/nikoksr/notify" + + log "github.com/sirupsen/logrus" + + shipclient "github.com/eosswedenorg-go/antelope-ship-client" +) + +type Client struct { + sh *shipclient.Client + api string + + done chan interface{} +} + +func New(api string, client *shipclient.Client) *Client { + return &Client{ + api: api, + sh: client, + done: make(chan interface{}), + } +} + +func (c *Client) connect() { + var recon_cnt uint = 0 + + for { + recon_cnt++ + log.Infof("Connecting to ship at: %s (Try %d)", c.api, recon_cnt) + err := c.sh.Connect(c.api) + if err != nil { + log.Println(err) + + if recon_cnt >= 3 { + msg := fmt.Sprintf("Failed to connect to ship at '%s'", c.api) + if err := notify.Send(context.Background(), "Ship_reader", msg); err != nil { + log.WithError(err).Error("Failed to send notification") + } + recon_cnt = 0 + } + + log.Info("Trying again in 5 seconds ....") + time.Sleep(5 * time.Second) + continue + } + + err = c.sh.SendBlocksRequest() + if err != nil { + log.Println(err) + return + } + + // Connected + log.Infof("Connected, Start: %d, End: %d", c.sh.StartBlock, c.sh.EndBlock) + break + } +} + +func (c *Client) read() { + err := c.sh.Read() + if err != nil { + if shErr, ok := err.(shipclient.ClientError); ok { + + // Bail out if socket is closed + if shErr.Type == shipclient.ErrSockClosed { + log.Info("Socket closed, Exiting") + return + } + + // Reconnect + if shErr.Type == shipclient.ErrSockRead || shErr.Type == shipclient.ErrNotConnected { + c.connect() + } + } + + log.WithError(err).Error("Failed to read from ship") + } +} + +func (c *Client) Run() error { + defer c.Close() + + for { + select { + case <-c.done: + return nil + default: + c.read() + } + } +} + +func (c *Client) Close() { + err := c.sh.Shutdown() + if err != nil { + log.WithError(err).Error("Failed to send close message") + } + + close(c.done) +} diff --git a/main.go b/main.go index f4ee45f..8291c34 100644 --- a/main.go +++ b/main.go @@ -8,16 +8,18 @@ import ( "syscall" "time" - "github.com/go-redis/redis/v8" log "github.com/sirupsen/logrus" "eosio-ship-trace-reader/abi" "eosio-ship-trace-reader/app" + notifyservice "eosio-ship-trace-reader/app/service/notifier" + "eosio-ship-trace-reader/app/service/redis" + shipclientservice "eosio-ship-trace-reader/app/service/shipclient" + "eosio-ship-trace-reader/app/service/telegram" "eosio-ship-trace-reader/config" "eosio-ship-trace-reader/transport/redis_pubsub" "github.com/nikoksr/notify" - "github.com/nikoksr/notify/service/telegram" eos "github.com/eoscanada/eos-go" shipclient "github.com/eosswedenorg-go/antelope-ship-client" @@ -172,26 +174,14 @@ func main() { return } - // Init telegram notification service - telegram, err := telegram.New(conf.Telegram.Id) + err = notifyservice.InitNotifier(telegram.Notifier(conf.Telegram)) if err != nil { - log.WithError(err).Fatal("Failed to initialize telegram") + log.WithError(err).Fatal("Failed to initialize notification service") return } - telegram.AddReceivers(conf.Telegram.Channel) - - // Register services in notification manager - notify.UseServices(telegram) - // Connect to redis - rdb := redis.NewClient(&redis.Options{ - Addr: conf.Redis.Addr, - Password: conf.Redis.Password, - DB: conf.Redis.DB, - }) - - err = rdb.Ping(context.Background()).Err() + rdb, err := redis.NewClient(conf.Redis) if err != nil { log.WithError(err).Fatal("Failed to connect to redis") return @@ -205,20 +195,12 @@ func main() { return } - if conf.StartBlockNum == config.NULL_BLOCK_NUMBER { - if conf.IrreversibleOnly { - conf.StartBlockNum = uint32(chainInfo.LastIrreversibleBlockNum) - } else { - conf.StartBlockNum = uint32(chainInfo.HeadBlockNum) - } + shClient, err = shipclientservice.NewClient(conf, chainInfo) + if err != nil { + log.WithError(err).Fatal("Failed to initialize ship client") + return } - shClient = shipclient.NewClient(func(c *shipclient.Client) { - c.StartBlock = conf.StartBlockNum - c.EndBlock = conf.EndBlockNum - c.IrreversibleOnly = conf.IrreversibleOnly - }) - app.SpawnProccessor( shClient, redis_pubsub.NewPublisher(rdb, redis_pubsub.Namespace{ diff --git a/transport/client.go b/transport/client.go new file mode 100644 index 0000000..c88bf4e --- /dev/null +++ b/transport/client.go @@ -0,0 +1,77 @@ +package transport + +import ( + "encoding/json" + + "eosio-ship-trace-reader/transport/message" +) + +type Client struct { + reader Reader + decoder message.Decoder + + actChan chan message.ActionTrace + hbChan chan message.HearthBeat + errChan chan error +} + +func NewClient(reader Reader) *Client { + return &Client{ + reader: reader, + decoder: json.Unmarshal, + actChan: make(chan message.ActionTrace, 16), + hbChan: make(chan message.HearthBeat, 16), + errChan: make(chan error, 16), + } +} + +func actWorker(decoder message.Decoder, out chan<- message.ActionTrace, reader Reader, channel Channel) { + for { + payload, err := reader.Read(channel) + if err != nil { + return + } + + var act message.ActionTrace + if err := decoder(payload, &act); err != nil { + continue + } + out <- act + } +} + +func hbWorker(decoder message.Decoder, out chan<- message.HearthBeat, reader Reader, channel Channel) { + for { + payload, err := reader.Read(channel) + if err != nil { + return + } + + var hb message.HearthBeat + if err := decoder(payload, &hb); err != nil { + continue + } + out <- hb + } +} + +func (c Client) Subscribe(channel Channel) { + if HeartbeatChannel.Is(channel) { + go hbWorker(c.decoder, c.hbChan, c.reader, channel) + } + + go actWorker(c.decoder, c.actChan, c.reader, channel) +} + +func (c Client) ActionTrace() <-chan message.ActionTrace { + return c.actChan +} + +func (c Client) Heartbeat() <-chan message.HearthBeat { + return c.hbChan +} + +func (c Client) Close() { + close(c.actChan) + close(c.hbChan) +}