1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-16 04:24:56 +02:00

switch github.com/pborman/getopt to github.com/urfave/cli for handling cli flags

This commit is contained in:
Henrik Hautakoski 2024-02-10 18:52:14 +01:00
parent f99bbec5ff
commit 655dd730d7
5 changed files with 422 additions and 369 deletions

View file

@ -13,7 +13,7 @@ CFGDIR=$(PREFIX)/etc/thalos
build: build/$(PROGRAM)
build/$(PROGRAM) :
$(GO) build $(GOBUILDFLAGS) -o $@ cmd/thalos/main.go
$(GO) build $(GOBUILDFLAGS) -o $@ cmd/thalos/main.go cmd/thalos/server.go
tools : build/thalos-tools

View file

@ -1,389 +1,80 @@
package main
import (
"context"
"errors"
"fmt"
"io"
"os"
"os/signal"
"path"
"syscall"
"time"
"github.com/cenkalti/backoff/v4"
eos "github.com/eoscanada/eos-go"
shipclient "github.com/eosswedenorg-go/antelope-ship-client"
"github.com/eosswedenorg-go/pid"
"github.com/eosswedenorg/thalos/api/message"
_ "github.com/eosswedenorg/thalos/api/message/json"
_ "github.com/eosswedenorg/thalos/api/message/msgpack"
api_redis "github.com/eosswedenorg/thalos/api/redis"
"github.com/eosswedenorg/thalos/app"
"github.com/eosswedenorg/thalos/app/abi"
. "github.com/eosswedenorg/thalos/app/cache"
"github.com/eosswedenorg/thalos/app/config"
driver "github.com/eosswedenorg/thalos/app/driver/redis"
. "github.com/eosswedenorg/thalos/app/log"
redis_cache "github.com/go-redis/cache/v9"
"github.com/nikoksr/notify"
"github.com/nikoksr/notify/service/telegram"
"github.com/pborman/getopt/v2"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)
// ---------------------------
// Global variables
// ---------------------------
var conf *config.Config
var shClient *shipclient.Stream
var running bool = true
var VersionString string = "dev"
var exit chan bool
var cache *Cache
var cacheStore Store
func readerLoop(processor *app.ShipProcessor) {
recon_cnt := 0
exp := &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: 0.25,
Multiplier: 2,
MaxInterval: 10 * time.Minute,
MaxElapsedTime: 0,
Stop: -1,
Clock: backoff.SystemClock,
}
exp.Reset()
log.WithFields(log.Fields{
"initial_interval": exp.InitialInterval,
"max_interval": exp.MaxInterval,
"randomization_factor": exp.RandomizationFactor,
"multiplier": exp.Multiplier,
}).Info("Connecting with Exponential Backoff")
connectOp := func() error {
recon_cnt++
log.WithFields(log.Fields{
"url": conf.Ship.Url,
"try": recon_cnt,
}).Info("Connecting to ship")
if err := shClient.Connect(conf.Ship.Url); err != nil {
return err
}
// Set stream client start block to processors current block
// Both values should be the same on first connect, but when reconnecting
// We don't want to start from the beginning
shClient.StartBlock = processor.GetCurrentBlock()
return shClient.SendBlocksRequest()
}
for running {
err := backoff.RetryNotify(connectOp, exp, func(err error, d time.Duration) {
if recon_cnt >= 3 {
msg := fmt.Sprintf("Failed to connect to ship at '%s'", conf.Ship.Url)
if err := notify.Send(context.Background(), conf.Name, msg); err != nil {
log.WithError(err).Error("Failed to send notification")
}
recon_cnt = 0
}
log.WithError(err).Error("Failed to connect to SHIP")
log.WithFields(log.Fields{
"reconn_at": time.Now().Add(d),
"reconn_in": d,
}).Info("Reconnecting in ", d)
})
if err != nil {
log.WithError(err).Error("Failed to connect to SHIP")
running = false
continue
}
recon_cnt = 0
log.WithFields(log.Fields{
"start": shClient.StartBlock,
"end": shClient.EndBlock,
}).Info("Connected to ship")
if err := shClient.Run(); err != nil {
if errors.Is(err, shipclient.ErrEndBlockReached) {
exit <- true
log.Info("Endblock reached.")
break
}
log.WithError(err).Error("Failed to read from ship")
}
}
}
func run(processor *app.ShipProcessor) {
// Spawn reader loop in another thread.
go readerLoop(processor)
// Create interrupt channel.
signals := make(chan os.Signal, 1)
// Register signal channel to receive signals from the os.
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
// Wait for interrupt
select {
case sig := <-signals:
log.WithField("signal", sig).Info("Signal received")
// Cleanly close the connection by sending a close message.
err := shClient.Shutdown()
if err != nil {
log.WithError(err).Info("failed to send close message to ship server")
}
case <-exit:
// Do nothing, just exit.
}
running = false
}
func getChain(def string) string {
if len(conf.Ship.Chain) > 0 {
return conf.Ship.Chain
}
return def
}
func LogLevels() []string {
list := []string{}
for _, lvl := range log.AllLevels {
list = append(list, lvl.String())
}
return list
}
func initAbiManger(api *eos.API, chain_id string) *abi.AbiManager {
cache := NewCache("thalos::cache::abi::"+chain_id, cacheStore)
return abi.NewAbiManager(cache, api)
}
func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) app.StateLoader {
return func(state *app.State) {
var source string
// Load state from cache.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
err := cache.Get(ctx, "state", &state)
cancel()
// on error (cache miss) or if current_block_no_cache is set.
// set current block from config/api
if current_block_no_cache || err != nil {
// Set from config if we have a sane value.
if conf.Ship.StartBlockNum != shipclient.NULL_BLOCK_NUMBER {
source = "config"
state.CurrentBlock = conf.Ship.StartBlockNum
} else {
// Otherwise, set from api.
if conf.Ship.IrreversibleOnly {
source = "api (LIB)"
state.CurrentBlock = uint32(chainInfo.LastIrreversibleBlockNum)
} else {
source = "api (HEAD)"
state.CurrentBlock = uint32(chainInfo.HeadBlockNum)
}
}
} else {
source = "cache"
}
log.WithFields(log.Fields{
"block": state.CurrentBlock,
"source": source,
}).Info("Starting from block")
}
}
func stateSaver(state app.State) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
return cache.Set(ctx, "state", state, 0)
}
func main() {
var err error
var chainInfo *eos.InfoResp
cli.AppHelpTemplate = `Usage: {{.HelpName}} [options]
exit = make(chan bool)
{{range .VisibleFlags}}{{.}}
{{end}}`
showHelp := getopt.BoolLong("help", 'h', "display this help text")
showVersion := getopt.BoolLong("version", 'v', "display the version")
configFile := getopt.StringLong("config", 'c', "./config.yml", "Config file to read", "file")
pidFile := getopt.StringLong("pid", 'p', "", "Where to write process id", "file")
logFile := getopt.StringLong("log", 'l', "", "Path to log file", "file")
logLevel := getopt.EnumLong("level", 'L', LogLevels(), "info", "Log level to use")
skip_currentblock_cache := getopt.Bool('n', "Force the application to take start block from config/api")
getopt.Parse()
if *showHelp {
getopt.Usage()
return
cli.HelpFlag = &cli.BoolFlag{
Name: "help",
Aliases: []string{"h"},
Usage: "display this help text",
DisableDefaultText: true,
}
if *showVersion {
fmt.Println(VersionString)
return
cli.VersionPrinter = func(cCtx *cli.Context) {
fmt.Printf("Version %s\n", VersionString)
}
// Write PID file
if len(*pidFile) > 0 {
log.WithField("file", *pidFile).Info("Writing pid to file")
err = pid.Save(*pidFile)
if err != nil {
log.WithError(err).Fatal("failed to write pid file")
return
}
cli.VersionFlag = &cli.BoolFlag{
Name: "version",
Aliases: []string{"v"},
Usage: "display the version",
DisableDefaultText: true,
}
// Parse config
conf, err = config.Load(*configFile)
if err != nil {
log.WithError(err).Fatal("failed to read config file")
return
app := &cli.App{
Version: VersionString,
Args: true,
UseShortOptionHandling: true,
HideHelpCommand: true,
Flags: []cli.Flag{
&cli.PathFlag{
Name: "config",
Aliases: []string{"c"},
Value: "./config.yml",
Usage: "Config `file` to read",
TakesFile: true,
},
&cli.StringFlag{
Name: "level",
Aliases: []string{"L"},
Usage: "Log level to use",
Value: "info",
},
&cli.PathFlag{
Name: "log",
Aliases: []string{"l"},
Usage: "Path to log `file`",
TakesFile: true,
},
&cli.BoolFlag{
Name: "n",
Usage: "Force the application to take start block from config/api",
DisableDefaultText: true,
},
&cli.StringFlag{
Name: "pid",
Aliases: []string{"p"},
Usage: "`file` to save process id to",
TakesFile: true,
},
},
Action: serverCmd,
}
// If log file is given on the commandline, override config values.
if len(*logFile) > 0 {
conf.Log.Directory = path.Dir(*logFile)
conf.Log.Filename = path.Base(*logFile)
if err := app.Run(os.Args); err != nil {
log.WithError(err).Fatal("Application error")
}
lvl, err := log.ParseLevel(*logLevel)
if err == nil {
log.WithField("value", lvl).Info("Setting log level")
log.SetLevel(lvl)
} else {
log.WithError(err).Warn("Failed to parse level")
}
if len(conf.Log.Filename) > 0 {
stdWriter, err := NewRotatingFileFromConfig(conf.Log, "info")
if err != nil {
log.WithError(err).Fatal("Failed to open info log")
return
}
errWriter, err := NewRotatingFileFromConfig(conf.Log, "error")
if err != nil {
log.WithError(err).Fatal("Failed to open error log")
return
}
log.WithFields(log.Fields{
"maxfilesize": conf.Log.MaxFileSize,
"maxage": conf.Log.MaxTime,
"directory": conf.Log.GetDirectory(),
"info_filename": stdWriter.GetFilename(),
"error_filename": errWriter.GetFilename(),
}).Info("Logging to file")
log.SetOutput(io.Discard)
log.AddHook(MakeStdHook(stdWriter))
log.AddHook(MakeErrorHook(errWriter))
}
// Init telegram notification service
if len(conf.Telegram.Id) > 0 {
telegram, err := telegram.New(conf.Telegram.Id)
if err != nil {
log.WithError(err).Fatal("Failed to initialize telegram")
return
}
telegram.AddReceivers(conf.Telegram.Channel)
// Register services in notification manager
notify.UseServices(telegram)
}
// Connect to redis
rdb := redis.NewClient(&redis.Options{
Addr: conf.Redis.Addr,
Username: conf.Redis.User,
Password: conf.Redis.Password,
DB: conf.Redis.DB,
})
err = rdb.Ping(context.Background()).Err()
if err != nil {
log.WithError(err).Fatal("Failed to connect to redis")
return
}
// Setup cache storage
cacheStore = NewRedisStore(&redis_cache.Options{
Redis: rdb,
// Cache 10k keys for 10 minutes.
LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute),
})
// Setup general cache
cache = NewCache("thalos::cache::instance::"+conf.Name, cacheStore)
log.WithField("api", conf.Api).Info("Get chain info from api")
eosClient := eos.New(conf.Api)
chainInfo, err = eosClient.GetInfo(context.Background())
if err != nil {
log.WithError(err).Fatal("Failed to get info")
return
}
shClient = shipclient.NewStream(func(s *shipclient.Stream) {
s.StartBlock = conf.Ship.StartBlockNum
s.EndBlock = conf.Ship.EndBlockNum
s.IrreversibleOnly = conf.Ship.IrreversibleOnly
})
// Get codec
codec, err := message.GetCodec(conf.MessageCodec)
if err != nil {
log.WithError(err).Fatal("Failed to load codec")
return
}
chain_id := getChain(chainInfo.ChainID.String())
processor := app.SpawnProccessor(
shClient,
stateLoader(chainInfo, *skip_currentblock_cache),
stateSaver,
driver.NewPublisher(context.Background(), rdb, api_redis.Namespace{
Prefix: conf.Redis.Prefix,
ChainID: chain_id,
}),
initAbiManger(eosClient, chain_id),
codec,
)
// Run the application
run(processor)
// Close the processor properly
processor.Close()
}

365
cmd/thalos/server.go Normal file
View file

@ -0,0 +1,365 @@
package main
import (
"context"
"errors"
"fmt"
"io"
"os"
"os/signal"
"path"
"syscall"
"time"
"github.com/cenkalti/backoff/v4"
eos "github.com/eoscanada/eos-go"
shipclient "github.com/eosswedenorg-go/antelope-ship-client"
"github.com/eosswedenorg-go/pid"
"github.com/eosswedenorg/thalos/api/message"
_ "github.com/eosswedenorg/thalos/api/message/json"
_ "github.com/eosswedenorg/thalos/api/message/msgpack"
api_redis "github.com/eosswedenorg/thalos/api/redis"
"github.com/eosswedenorg/thalos/app"
"github.com/eosswedenorg/thalos/app/abi"
. "github.com/eosswedenorg/thalos/app/cache"
"github.com/eosswedenorg/thalos/app/config"
driver "github.com/eosswedenorg/thalos/app/driver/redis"
. "github.com/eosswedenorg/thalos/app/log"
redis_cache "github.com/go-redis/cache/v9"
"github.com/nikoksr/notify"
"github.com/nikoksr/notify/service/telegram"
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)
// ---------------------------
// Global variables
// ---------------------------
var conf *config.Config
var shClient *shipclient.Stream
var running bool = true
var exit chan bool
var cache *Cache
var cacheStore Store
func readerLoop(processor *app.ShipProcessor) {
recon_cnt := 0
exp := &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: 0.25,
Multiplier: 2,
MaxInterval: 10 * time.Minute,
MaxElapsedTime: 0,
Stop: -1,
Clock: backoff.SystemClock,
}
exp.Reset()
log.WithFields(log.Fields{
"initial_interval": exp.InitialInterval,
"max_interval": exp.MaxInterval,
"randomization_factor": exp.RandomizationFactor,
"multiplier": exp.Multiplier,
}).Info("Connecting with Exponential Backoff")
connectOp := func() error {
recon_cnt++
log.WithFields(log.Fields{
"url": conf.Ship.Url,
"try": recon_cnt,
}).Info("Connecting to ship")
if err := shClient.Connect(conf.Ship.Url); err != nil {
return err
}
// Set stream client start block to processors current block
// Both values should be the same on first connect, but when reconnecting
// We don't want to start from the beginning
shClient.StartBlock = processor.GetCurrentBlock()
return shClient.SendBlocksRequest()
}
for running {
err := backoff.RetryNotify(connectOp, exp, func(err error, d time.Duration) {
if recon_cnt >= 3 {
msg := fmt.Sprintf("Failed to connect to ship at '%s'", conf.Ship.Url)
if err := notify.Send(context.Background(), conf.Name, msg); err != nil {
log.WithError(err).Error("Failed to send notification")
}
recon_cnt = 0
}
log.WithError(err).Error("Failed to connect to SHIP")
log.WithFields(log.Fields{
"reconn_at": time.Now().Add(d),
"reconn_in": d,
}).Info("Reconnecting in ", d)
})
if err != nil {
log.WithError(err).Error("Failed to connect to SHIP")
running = false
continue
}
recon_cnt = 0
log.WithFields(log.Fields{
"start": shClient.StartBlock,
"end": shClient.EndBlock,
}).Info("Connected to ship")
if err := shClient.Run(); err != nil {
if errors.Is(err, shipclient.ErrEndBlockReached) {
exit <- true
log.Info("Endblock reached.")
break
}
log.WithError(err).Error("Failed to read from ship")
}
}
}
func run(processor *app.ShipProcessor) {
// Spawn reader loop in another thread.
go readerLoop(processor)
// Create interrupt channel.
signals := make(chan os.Signal, 1)
// Register signal channel to receive signals from the os.
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
// Wait for interrupt
select {
case sig := <-signals:
log.WithField("signal", sig).Info("Signal received")
// Cleanly close the connection by sending a close message.
err := shClient.Shutdown()
if err != nil {
log.WithError(err).Info("failed to send close message to ship server")
}
case <-exit:
// Do nothing, just exit.
}
running = false
}
func getChain(def string) string {
if len(conf.Ship.Chain) > 0 {
return conf.Ship.Chain
}
return def
}
func LogLevels() []string {
list := []string{}
for _, lvl := range log.AllLevels {
list = append(list, lvl.String())
}
return list
}
func initAbiManger(api *eos.API, chain_id string) *abi.AbiManager {
cache := NewCache("thalos::cache::abi::"+chain_id, cacheStore)
return abi.NewAbiManager(cache, api)
}
func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) app.StateLoader {
return func(state *app.State) {
var source string
// Load state from cache.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
err := cache.Get(ctx, "state", &state)
cancel()
// on error (cache miss) or if current_block_no_cache is set.
// set current block from config/api
if current_block_no_cache || err != nil {
// Set from config if we have a sane value.
if conf.Ship.StartBlockNum != shipclient.NULL_BLOCK_NUMBER {
source = "config"
state.CurrentBlock = conf.Ship.StartBlockNum
} else {
// Otherwise, set from api.
if conf.Ship.IrreversibleOnly {
source = "api (LIB)"
state.CurrentBlock = uint32(chainInfo.LastIrreversibleBlockNum)
} else {
source = "api (HEAD)"
state.CurrentBlock = uint32(chainInfo.HeadBlockNum)
}
}
} else {
source = "cache"
}
log.WithFields(log.Fields{
"block": state.CurrentBlock,
"source": source,
}).Info("Starting from block")
}
}
func stateSaver(state app.State) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
return cache.Set(ctx, "state", state, 0)
}
func serverCmd(ctx *cli.Context) error {
var err error
var chainInfo *eos.InfoResp
exit = make(chan bool)
skip_currentblock_cache := ctx.Bool("n")
// Write PID file
pidFile := ctx.String("pid")
if len(pidFile) > 0 {
log.WithField("file", pidFile).Info("Writing pid to file")
if err = pid.Save(pidFile); err != nil {
return err
}
}
// Parse config
conf, err = config.Load(ctx.Path("config"))
if err != nil {
return err
}
// If log file is given on the commandline, override config values.
logFile := ctx.Path("log")
if len(logFile) > 0 {
conf.Log.Directory = path.Dir(logFile)
conf.Log.Filename = path.Base(logFile)
}
lvl, err := log.ParseLevel(ctx.String("level"))
if err == nil {
log.WithField("value", lvl).Info("Setting log level")
log.SetLevel(lvl)
} else {
log.WithError(err).Warn("Failed to parse level")
}
if len(conf.Log.Filename) > 0 {
stdWriter, err := NewRotatingFileFromConfig(conf.Log, "info")
if err != nil {
return err
}
errWriter, err := NewRotatingFileFromConfig(conf.Log, "error")
if err != nil {
return err
}
log.WithFields(log.Fields{
"maxfilesize": conf.Log.MaxFileSize,
"maxage": conf.Log.MaxTime,
"directory": conf.Log.GetDirectory(),
"info_filename": stdWriter.GetFilename(),
"error_filename": errWriter.GetFilename(),
}).Info("Logging to file")
log.SetOutput(io.Discard)
log.AddHook(MakeStdHook(stdWriter))
log.AddHook(MakeErrorHook(errWriter))
}
// Init telegram notification service
if len(conf.Telegram.Id) > 0 {
telegram, err := telegram.New(conf.Telegram.Id)
if err != nil {
return err
}
telegram.AddReceivers(conf.Telegram.Channel)
// Register services in notification manager
notify.UseServices(telegram)
}
// Connect to redis
rdb := redis.NewClient(&redis.Options{
Addr: conf.Redis.Addr,
Username: conf.Redis.User,
Password: conf.Redis.Password,
DB: conf.Redis.DB,
})
err = rdb.Ping(context.Background()).Err()
if err != nil {
return err
}
// Setup cache storage
cacheStore = NewRedisStore(&redis_cache.Options{
Redis: rdb,
// Cache 10k keys for 10 minutes.
LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute),
})
// Setup general cache
cache = NewCache("thalos::cache::instance::"+conf.Name, cacheStore)
log.WithField("api", conf.Api).Info("Get chain info from api")
eosClient := eos.New(conf.Api)
chainInfo, err = eosClient.GetInfo(context.Background())
if err != nil {
return err
}
shClient = shipclient.NewStream(func(s *shipclient.Stream) {
s.StartBlock = conf.Ship.StartBlockNum
s.EndBlock = conf.Ship.EndBlockNum
s.IrreversibleOnly = conf.Ship.IrreversibleOnly
})
// Get codec
codec, err := message.GetCodec(conf.MessageCodec)
if err != nil {
log.WithError(err)
return err
}
chain_id := getChain(chainInfo.ChainID.String())
processor := app.SpawnProccessor(
shClient,
stateLoader(chainInfo, skip_currentblock_cache),
stateSaver,
driver.NewPublisher(context.Background(), rdb, api_redis.Namespace{
Prefix: conf.Redis.Prefix,
ChainID: chain_id,
}),
initAbiManger(eosClient, chain_id),
codec,
)
// Run the application
run(processor)
// Close the processor properly
processor.Close()
return nil
}

1
go.mod
View file

@ -12,7 +12,6 @@ require (
github.com/go-redis/cache/v9 v9.0.0
github.com/go-redis/redismock/v9 v9.2.0
github.com/nikoksr/notify v0.41.0
github.com/pborman/getopt/v2 v2.1.0
github.com/redis/go-redis/v9 v9.4.0
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.8.4

2
go.sum
View file

@ -120,8 +120,6 @@ github.com/onsi/gomega v1.24.1/go.mod h1:3AOiACssS3/MajrniINInwbfOOtfZvplPzuRSmv
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/onsi/gomega v1.31.1 h1:KYppCUK+bUgAZwHOu7EXVBKyQA6ILvOESHkn/tgoqvo=
github.com/onsi/gomega v1.31.1/go.mod h1:y40C95dwAD1Nz36SsEnxvfFe8FFfNxzI5eJ0EYGyAy0=
github.com/pborman/getopt/v2 v2.1.0 h1:eNfR+r+dWLdWmV8g5OlpyrTYHkhVNxHBdN2cCrJmOEA=
github.com/pborman/getopt/v2 v2.1.0/go.mod h1:4NtW75ny4eBw9fO1bhtNdYTlZKYX5/tBLtsOpwKIKd0=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=