1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-17 04:30:03 +02:00
This commit is contained in:
Henrik Hautakoski 2023-03-08 15:51:54 +01:00
parent 764db98417
commit 074c089679
7 changed files with 272 additions and 29 deletions

View file

@ -0,0 +1,20 @@
package notifier
import (
"github.com/nikoksr/notify"
)
type NotifierInit func() (notify.Notifier, error)
func InitNotifier(initializers ...NotifierInit) error {
for _, init := range initializers {
notifier, err := init()
if err != nil {
return err
}
notify.UseServices(notifier)
}
return nil
}

View file

@ -0,0 +1,17 @@
package redis
import (
"context"
"github.com/go-redis/redis/v8"
)
func NewClient(cfg Config) (*redis.Client, error) {
rdb := redis.NewClient(&redis.Options{
Addr: cfg.Addr,
Password: cfg.Password,
DB: cfg.DB,
})
return rdb, rdb.Ping(context.Background()).Err()
}

View file

@ -0,0 +1,26 @@
package shipclient
import (
"eosio-ship-trace-reader/config"
"github.com/eoscanada/eos-go"
shipclient "github.com/eosswedenorg-go/antelope-ship-client"
)
func NewClient(cfg *config.Config, chain *eos.InfoResp) (*shipclient.Client, error) {
if cfg.StartBlockNum == config.NULL_BLOCK_NUMBER {
if cfg.IrreversibleOnly {
cfg.StartBlockNum = uint32(chain.LastIrreversibleBlockNum)
} else {
cfg.StartBlockNum = uint32(chain.HeadBlockNum)
}
}
options := func(c *shipclient.Client) {
c.StartBlock = cfg.StartBlockNum
c.EndBlock = cfg.EndBlockNum
c.IrreversibleOnly = cfg.IrreversibleOnly
}
return shipclient.NewClient(options), nil
}

View file

@ -1,6 +1,21 @@
package telegram
import (
"github.com/nikoksr/notify"
"github.com/nikoksr/notify/service/telegram"
)
type Config struct {
Id string `json:"id"`
Channel int64 `json:"channel"`
}
func Notifier(conf Config) func() (notify.Notifier, error) {
return func() (notify.Notifier, error) {
tg, err := telegram.New(conf.Id)
if err == nil {
tg.AddReceivers(conf.Channel)
}
return tg, err
}
}

106
app/ship/client.go Normal file
View file

@ -0,0 +1,106 @@
package ship
import (
"context"
"fmt"
"time"
"github.com/nikoksr/notify"
log "github.com/sirupsen/logrus"
shipclient "github.com/eosswedenorg-go/antelope-ship-client"
)
type Client struct {
sh *shipclient.Client
api string
done chan interface{}
}
func New(api string, client *shipclient.Client) *Client {
return &Client{
api: api,
sh: client,
done: make(chan interface{}),
}
}
func (c *Client) connect() {
var recon_cnt uint = 0
for {
recon_cnt++
log.Infof("Connecting to ship at: %s (Try %d)", c.api, recon_cnt)
err := c.sh.Connect(c.api)
if err != nil {
log.Println(err)
if recon_cnt >= 3 {
msg := fmt.Sprintf("Failed to connect to ship at '%s'", c.api)
if err := notify.Send(context.Background(), "Ship_reader", msg); err != nil {
log.WithError(err).Error("Failed to send notification")
}
recon_cnt = 0
}
log.Info("Trying again in 5 seconds ....")
time.Sleep(5 * time.Second)
continue
}
err = c.sh.SendBlocksRequest()
if err != nil {
log.Println(err)
return
}
// Connected
log.Infof("Connected, Start: %d, End: %d", c.sh.StartBlock, c.sh.EndBlock)
break
}
}
func (c *Client) read() {
err := c.sh.Read()
if err != nil {
if shErr, ok := err.(shipclient.ClientError); ok {
// Bail out if socket is closed
if shErr.Type == shipclient.ErrSockClosed {
log.Info("Socket closed, Exiting")
return
}
// Reconnect
if shErr.Type == shipclient.ErrSockRead || shErr.Type == shipclient.ErrNotConnected {
c.connect()
}
}
log.WithError(err).Error("Failed to read from ship")
}
}
func (c *Client) Run() error {
defer c.Close()
for {
select {
case <-c.done:
return nil
default:
c.read()
}
}
}
func (c *Client) Close() {
err := c.sh.Shutdown()
if err != nil {
log.WithError(err).Error("Failed to send close message")
}
close(c.done)
}

40
main.go
View file

@ -8,16 +8,18 @@ import (
"syscall"
"time"
"github.com/go-redis/redis/v8"
log "github.com/sirupsen/logrus"
"eosio-ship-trace-reader/abi"
"eosio-ship-trace-reader/app"
notifyservice "eosio-ship-trace-reader/app/service/notifier"
"eosio-ship-trace-reader/app/service/redis"
shipclientservice "eosio-ship-trace-reader/app/service/shipclient"
"eosio-ship-trace-reader/app/service/telegram"
"eosio-ship-trace-reader/config"
"eosio-ship-trace-reader/transport/redis_pubsub"
"github.com/nikoksr/notify"
"github.com/nikoksr/notify/service/telegram"
eos "github.com/eoscanada/eos-go"
shipclient "github.com/eosswedenorg-go/antelope-ship-client"
@ -172,26 +174,14 @@ func main() {
return
}
// Init telegram notification service
telegram, err := telegram.New(conf.Telegram.Id)
err = notifyservice.InitNotifier(telegram.Notifier(conf.Telegram))
if err != nil {
log.WithError(err).Fatal("Failed to initialize telegram")
log.WithError(err).Fatal("Failed to initialize notification service")
return
}
telegram.AddReceivers(conf.Telegram.Channel)
// Register services in notification manager
notify.UseServices(telegram)
// Connect to redis
rdb := redis.NewClient(&redis.Options{
Addr: conf.Redis.Addr,
Password: conf.Redis.Password,
DB: conf.Redis.DB,
})
err = rdb.Ping(context.Background()).Err()
rdb, err := redis.NewClient(conf.Redis)
if err != nil {
log.WithError(err).Fatal("Failed to connect to redis")
return
@ -205,20 +195,12 @@ func main() {
return
}
if conf.StartBlockNum == config.NULL_BLOCK_NUMBER {
if conf.IrreversibleOnly {
conf.StartBlockNum = uint32(chainInfo.LastIrreversibleBlockNum)
} else {
conf.StartBlockNum = uint32(chainInfo.HeadBlockNum)
}
shClient, err = shipclientservice.NewClient(conf, chainInfo)
if err != nil {
log.WithError(err).Fatal("Failed to initialize ship client")
return
}
shClient = shipclient.NewClient(func(c *shipclient.Client) {
c.StartBlock = conf.StartBlockNum
c.EndBlock = conf.EndBlockNum
c.IrreversibleOnly = conf.IrreversibleOnly
})
app.SpawnProccessor(
shClient,
redis_pubsub.NewPublisher(rdb, redis_pubsub.Namespace{

77
transport/client.go Normal file
View file

@ -0,0 +1,77 @@
package transport
import (
"encoding/json"
"eosio-ship-trace-reader/transport/message"
)
type Client struct {
reader Reader
decoder message.Decoder
actChan chan message.ActionTrace
hbChan chan message.HearthBeat
errChan chan error
}
func NewClient(reader Reader) *Client {
return &Client{
reader: reader,
decoder: json.Unmarshal,
actChan: make(chan message.ActionTrace, 16),
hbChan: make(chan message.HearthBeat, 16),
errChan: make(chan error, 16),
}
}
func actWorker(decoder message.Decoder, out chan<- message.ActionTrace, reader Reader, channel Channel) {
for {
payload, err := reader.Read(channel)
if err != nil {
return
}
var act message.ActionTrace
if err := decoder(payload, &act); err != nil {
continue
}
out <- act
}
}
func hbWorker(decoder message.Decoder, out chan<- message.HearthBeat, reader Reader, channel Channel) {
for {
payload, err := reader.Read(channel)
if err != nil {
return
}
var hb message.HearthBeat
if err := decoder(payload, &hb); err != nil {
continue
}
out <- hb
}
}
func (c Client) Subscribe(channel Channel) {
if HeartbeatChannel.Is(channel) {
go hbWorker(c.decoder, c.hbChan, c.reader, channel)
}
go actWorker(c.decoder, c.actChan, c.reader, channel)
}
func (c Client) ActionTrace() <-chan message.ActionTrace {
return c.actChan
}
func (c Client) Heartbeat() <-chan message.HearthBeat {
return c.hbChan
}
func (c Client) Close() {
close(c.actChan)
close(c.hbChan)
}