1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-16 04:24:56 +02:00

Merge branch '32-request-for-more-startup-flags'

This commit is contained in:
Henrik Hautakoski 2024-02-18 20:03:54 +01:00
commit 2382f72e5a
8 changed files with 406 additions and 257 deletions

View file

@ -1,6 +1,7 @@
package main
import (
"github.com/eosswedenorg/thalos/internal/config"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
@ -28,13 +29,14 @@ func init() {
rootCmd.SetVersionTemplate(`{{with .Name}}{{printf "%s " .}}{{end}}{{printf "v%s" .Version}}` + "\n")
flags := pflag.FlagSet{}
flags.StringP("config", "c", "./config.yml", "Config file to read")
flags.StringP("level", "L", "info", "Log level to use")
flags.StringP("log", "l", "", "Path to log file (default: print to stdout/stderr)")
flags.StringP("pid", "p", "", "Where to write process id")
flags.BoolP("no-state-cache", "n", false, "Force the application to take start block from config/api")
rootCmd.PersistentFlags().AddFlagSet(&flags)
rootCmd.PersistentFlags().AddFlagSet(config.GetFlags())
}
func main() {

View file

@ -7,6 +7,7 @@ import (
"io"
"os"
"os/signal"
"path"
"syscall"
"time"
@ -153,7 +154,7 @@ func initAbiManager(api *eos.API, store cache.Store, chain_id string) *abi.AbiMa
return abi.NewAbiManager(cache, api)
}
func stateLoader(conf config.Config, chainInfo func() *eos.InfoResp, cache *cache.Cache, current_block_no_cache bool) StateLoader {
func stateLoader(conf *config.Config, start_block_flag *pflag.Flag, chainInfo func() *eos.InfoResp, cache *cache.Cache, current_block_no_cache bool) StateLoader {
return func(state *State) {
var source string
@ -167,7 +168,13 @@ func stateLoader(conf config.Config, chainInfo func() *eos.InfoResp, cache *cach
if current_block_no_cache || err != nil {
// Set from config if we have a sane value.
if conf.Ship.StartBlockNum != shipclient.NULL_BLOCK_NUMBER {
source = "config"
if start_block_flag != nil && start_block_flag.Changed {
source = "cli"
} else {
source = "config"
}
state.CurrentBlock = conf.Ship.StartBlockNum
} else {
// Otherwise, set from api.
@ -198,23 +205,34 @@ func stateSaver(cache *cache.Cache) StateSaver {
}
}
func ReadConfig(cfg *config.Config, flags *pflag.FlagSet) error {
func GetConfig(flags *pflag.FlagSet) (*config.Config, error) {
filename, err := flags.GetString("config")
if err != nil {
return err
return nil, err
}
// Read file first.
if err := cfg.ReadFile(filename); err != nil {
return err
cfg, err := config.NewBuilder().
SetConfigFile(filename).
SetFlags(flags).
Build()
if err != nil {
return nil, err
}
// Then override any cli flags
if err := cfg.ReadCliFlags(flags); err != nil {
return err
logFile, _ := flags.GetString("log")
if len(logFile) > 0 {
cfg.Log.Directory = path.Dir(logFile)
cfg.Log.Filename = path.Base(logFile)
}
return nil
// If start-block is provided, we should set no-state-cache to true.
if startBlock := flags.Lookup("start-block"); startBlock != nil && startBlock.Changed {
if err := flags.Set("no-state-cache", "true"); err != nil {
return cfg, nil
}
}
return cfg, nil
}
// "Clever" way to make sure we only call the api once.
@ -244,8 +262,6 @@ func chainInfoOnce(api *eos.API) func() *eos.InfoResp {
func serverCmd(cmd *cobra.Command, args []string) {
var err error
skip_currentblock_cache, _ := cmd.Flags().GetBool("no-state-cache")
// Write PID file
pidFile, err := cmd.Flags().GetString("pid")
if err != nil {
@ -257,12 +273,14 @@ func serverCmd(cmd *cobra.Command, args []string) {
}
// Parse config
conf := config.New()
if err = ReadConfig(&conf, cmd.Flags()); err != nil {
conf, err := GetConfig(cmd.Flags())
if err != nil {
log.WithError(err).Fatal("Failed to read config")
return
}
skip_currentblock_cache, _ := cmd.Flags().GetBool("no-state-cache")
flagLevel, _ := cmd.Flags().GetString("level")
lvl, err := log.ParseLevel(flagLevel)
if err == nil {
@ -360,7 +378,7 @@ func serverCmd(cmd *cobra.Command, args []string) {
processor := SpawnProccessor(
shClient,
stateLoader(conf, chainInfo, cache, skip_currentblock_cache),
stateLoader(conf, cmd.Flags().Lookup("start-block"), chainInfo, cache, skip_currentblock_cache),
stateSaver(cache),
driver.NewPublisher(context.Background(), rdb, api_redis.Namespace{
Prefix: conf.Redis.Prefix,
@ -371,7 +389,7 @@ func serverCmd(cmd *cobra.Command, args []string) {
)
// Run the application
run(&conf, shClient, processor)
run(conf, shClient, processor)
// Close the processor properly
processor.Close()

120
internal/config/builder.go Normal file
View file

@ -0,0 +1,120 @@
package config
import (
"errors"
"io"
"os"
"github.com/mitchellh/mapstructure"
"github.com/spf13/pflag"
"github.com/spf13/viper"
)
// This is a simple module that encapsulate the creation
// of a config object and can override values from cli flags.
type Builder struct {
in io.Reader
flags *pflag.FlagSet
binds map[string]string
}
func NewBuilder() *Builder {
return &Builder{
binds: map[string]string{
"api": "url",
"message_codec": "codec",
// Redis
"redis.addr": "redis-addr",
"redis.user": "redis-user",
"redis.password": "redis-password",
"redis.db": "redis-db",
"redis.prefix": "redis-prefix",
// Telegram
"telegram.id": "telegram-id",
"telegram.channel": "telegram-channel",
// Log
"log.maxfilesize": "log-max-filesize",
"log.maxtime": "log-max-time",
// Ship
"ship.url": "ship-url",
"ship.start_block_num": "start-block",
"ship.end_block_num": "end-block",
"ship.irreversible_only": "irreversible-only",
"ship.max_messages_in_flight": "max-msg-in-flight",
"ship.chain": "chain",
},
}
}
// Set the config file to read
func (b *Builder) SetConfigFile(filename string) *Builder {
file, _ := os.Open(filename)
return b.SetSource(file)
}
// Set the source to read
func (b *Builder) SetSource(in io.Reader) *Builder {
b.in = in
return b
}
// Set all flags that the builder should use.
func (b *Builder) SetFlags(flags *pflag.FlagSet) *Builder {
b.flags = flags
return b
}
// Add a flag to the builder.
func (b *Builder) AddFlag(flag *pflag.Flag) *Builder {
b.flags.AddFlag(flag)
return b
}
// Build the config object from file, cli-flags
func (b *Builder) Build() (*Config, error) {
if b.in == nil {
return nil, errors.New("Config not set")
}
conf := Config{}
v := viper.New()
v.SetConfigType("yaml")
if b.flags != nil {
// bind flags in viper.
for key, flagname := range b.binds {
flag := b.flags.Lookup(flagname)
if flag == nil {
continue
}
if err := v.BindPFlag(key, flag); err != nil {
return nil, err
}
}
}
// Read config and unmarshal
if err := v.ReadConfig(b.in); err != nil {
return nil, err
}
decoders := mapstructure.ComposeDecodeHookFunc(
mapstructure.TextUnmarshallerHookFunc(),
mapstructure.StringToTimeDurationHookFunc(),
mapstructure.StringToSliceHookFunc(","),
)
err := v.Unmarshal(&conf, viper.DecodeHook(decoders))
if err != nil {
return nil, err
}
return &conf, nil
}

View file

@ -0,0 +1,210 @@
package config
import (
"bytes"
"testing"
"time"
"github.com/eosswedenorg/thalos/internal/log"
"github.com/stretchr/testify/require"
)
func TestBuilder(t *testing.T) {
expected := Config{
Name: "ship-reader-1",
Api: "http://127.0.0.1:8080",
MessageCodec: "mojibake",
Log: log.Config{
Filename: "some_file.log",
Directory: "/path/to/whatever",
MaxFileSize: 200,
MaxTime: 30 * time.Minute,
},
Ship: ShipConfig{
Url: "127.0.0.1:8089",
StartBlockNum: 23671836,
EndBlockNum: 23872222,
IrreversibleOnly: true,
MaxMessagesInFlight: 1337,
},
Telegram: TelegramConfig{
Id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw",
Channel: -123456789,
},
Redis: RedisConfig{
Addr: "localhost:6379",
User: "myuser",
Password: "passwd",
DB: 4,
Prefix: "some::ship",
},
}
builder := NewBuilder()
builder.SetSource(bytes.NewBuffer([]byte(`
name: "ship-reader-1"
api: "http://127.0.0.1:8080"
message_codec: "mojibake"
log:
filename: some_file.log
directory: /path/to/whatever
maxtime: 30m
maxfilesize: 200b
ship:
url: "127.0.0.1:8089"
irreversible_only: true
max_messages_in_flight: 1337
start_block_num: 23671836
end_block_num: 23872222
telegram:
id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw"
channel: -123456789
redis:
addr: "localhost:6379"
user: "myuser"
password: "passwd"
db: 4
prefix: "some::ship"
`)))
cfg, err := builder.Build()
require.NoError(t, err)
require.Equal(t, &expected, cfg)
}
func TestBuilder_ConfigWithFlags(t *testing.T) {
expected := Config{
Name: "ship-reader-1",
Api: "https://api.example.com",
MessageCodec: "msgpack",
Log: log.Config{
Filename: "mylog.log",
Directory: "/var/log",
MaxFileSize: 200,
MaxTime: 30 * time.Minute,
},
Ship: ShipConfig{
Url: "127.0.0.1:8089",
StartBlockNum: 23671836,
EndBlockNum: 23872222,
IrreversibleOnly: true,
MaxMessagesInFlight: 1337,
},
Telegram: TelegramConfig{
Id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw",
Channel: -123456789,
},
Redis: RedisConfig{
Addr: "localhost:6379",
User: "userfromcli",
Password: "passwd",
DB: 4,
Prefix: "some::ship",
},
}
builder := NewBuilder()
builder.SetSource(bytes.NewBuffer([]byte(`
name: "ship-reader-1"
api: "http://127.0.0.1:8080"
message_codec: "mojibake"
log:
filename: mylog.log
directory: /var/log
maxtime: 30m
maxfilesize: 200b
ship:
url: "127.0.0.1:8089"
irreversible_only: true
max_messages_in_flight: 1337
start_block_num: 23671836
end_block_num: 23872222
telegram:
id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw"
channel: -123456789
redis:
addr: "localhost:6379"
user: "myuser"
password: "passwd"
db: 4
prefix: "some::ship"
`)))
flags := GetFlags()
require.NoError(t, flags.Set("url", "https://api.example.com"))
require.NoError(t, flags.Set("codec", "msgpack"))
require.NoError(t, flags.Set("redis-user", "userfromcli"))
builder.SetFlags(flags)
cfg, err := builder.Build()
require.NoError(t, err)
require.Equal(t, &expected, cfg)
}
func TestBuilder_NilSource(t *testing.T) {
cfg, err := NewBuilder().Build()
require.Nil(t, cfg)
require.EqualError(t, err, "Config not set")
}
func TestBuilder_Flags(t *testing.T) {
flags := GetFlags()
require.NoError(t, flags.Set("url", "https://myapi"))
require.NoError(t, flags.Set("codec", "binary"))
require.NoError(t, flags.Set("redis-addr", "154.223.38.15:6380"))
require.NoError(t, flags.Set("redis-user", "myuser"))
require.NoError(t, flags.Set("redis-password", "secret123"))
require.NoError(t, flags.Set("redis-db", "3"))
require.NoError(t, flags.Set("redis-prefix", "custom-prefix"))
require.NoError(t, flags.Set("telegram-id", "72983126312982618"))
require.NoError(t, flags.Set("telegram-channel", "-293492332"))
require.NoError(t, flags.Set("log-max-filesize", "25mb"))
require.NoError(t, flags.Set("log-max-time", "10m"))
require.NoError(t, flags.Set("ship-url", "ws://myship.com:7823"))
require.NoError(t, flags.Set("start-block", "7327833"))
require.NoError(t, flags.Set("end-block", "329408392"))
require.NoError(t, flags.Set("irreversible-only", "true"))
require.NoError(t, flags.Set("max-msg-in-flight", "98"))
require.NoError(t, flags.Set("chain", "wax"))
cfg, err := NewBuilder().
SetSource(bytes.NewReader([]byte(``))).
SetFlags(flags).
Build()
expected := Config{
Api: "https://myapi",
MessageCodec: "binary",
Log: log.Config{
MaxFileSize: 25 * 1000 * 1000, // 25 mb
MaxTime: time.Minute * 10,
},
Ship: ShipConfig{
Url: "ws://myship.com:7823",
StartBlockNum: 7327833,
EndBlockNum: 329408392,
MaxMessagesInFlight: 98,
IrreversibleOnly: true,
Chain: "wax",
},
Telegram: TelegramConfig{
Id: "72983126312982618",
Channel: -293492332,
},
Redis: RedisConfig{
Addr: "154.223.38.15:6380",
User: "myuser",
Password: "secret123",
DB: 3,
Prefix: "custom-prefix",
},
}
require.NoError(t, err)
require.Equal(t, &expected, cfg)
}

View file

@ -1,17 +1,46 @@
package config
import (
"path"
"time"
shipclient "github.com/eosswedenorg-go/antelope-ship-client"
"github.com/spf13/pflag"
)
// Read cli flag values into the config
func (cfg *Config) ReadCliFlags(flags *pflag.FlagSet) error {
logFile, _ := flags.GetString("log")
if len(logFile) > 0 {
cfg.Log.Directory = path.Dir(logFile)
cfg.Log.Filename = path.Base(logFile)
}
return nil
func GetFlags() *pflag.FlagSet {
flags := pflag.FlagSet{}
// Generic
flags.StringP("url", "u", "", "Url to antelope api")
flags.String("codec", "json", "Codec used to send messages")
// Redis
flags.String("redis-addr", "127.0.0.1:6379", "host:port to redis server")
flags.String("redis-user", "", "Redis username")
flags.String("redis-password", "", "Redis password")
flags.Int("redis-db", 0, "Redis database")
flags.String("redis-prefix", "ship", "Redis channel prefix")
// Telegram
flags.String("telegram-id", "", "Id of telegram bot")
flags.Int64("telegram-channel", 0, "Telegram channel to send notifications to")
// Log
flags.StringP("log", "l", "", "Path to log file (default: print to stdout/stderr)")
flags.String("log-max-filesize", "10mb", "Max filesize for logfile to rotate")
flags.Duration("log-max-time", time.Hour*24, "Max time for logfile to rotate")
// Ship
flags.String("ship-url", "ws://127.0.0.1:8080", "Url to ship node")
flags.Uint32("start-block", shipclient.NULL_BLOCK_NUMBER, "Start to stream from this block")
flags.Uint32("end-block", shipclient.NULL_BLOCK_NUMBER, "Stop streaming when this block is reached")
flags.Lookup("start-block").DefValue = "config value, cache, head from api"
flags.Lookup("end-block").DefValue = "none"
flags.Bool("irreversible-only", false, "Only stream irreversible blocks from ship")
flags.Int("max-msg-in-flight", 10, "Maximum messages that can be sent from SHIP without acknowledgement")
flags.String("chain", "", "ChainID used in channel namespace, can be any string (default from api)")
return &flags
}

View file

@ -1,12 +1,7 @@
package config
import (
"reflect"
"time"
"github.com/eosswedenorg/thalos/internal/log"
shipclient "github.com/eosswedenorg-go/antelope-ship-client"
)
type RedisConfig struct {
@ -43,43 +38,3 @@ type Config struct {
Telegram TelegramConfig `yaml:"telegram" mapstructure:"telegram"`
}
// Create a new Config object with default values
func New() Config {
return Config{
MessageCodec: "json",
Log: log.Config{
MaxFileSize: 10 * 1000 * 1000, // 10 mb
MaxTime: time.Hour * 24,
},
Ship: defaultShipConfig(""),
Redis: RedisConfig{
Addr: "localhost:6379",
Prefix: "ship",
},
}
}
func defaultShipConfig(url string) ShipConfig {
return ShipConfig{
Url: url,
StartBlockNum: shipclient.NULL_BLOCK_NUMBER,
EndBlockNum: shipclient.NULL_BLOCK_NUMBER,
MaxMessagesInFlight: 10,
IrreversibleOnly: false,
}
}
// mapstructure DecodeHook that can parse a shorthand ship config (only string instead of struct.)
func decodeShorthandShipConfig(from reflect.Value, to reflect.Value) (interface{}, error) {
shipType := reflect.TypeOf(ShipConfig{})
// If to is a struct and is assignable to a ShipConfig and from is a string.
// Then we treat the from value as ShipConfig.Url
if to.Kind() == reflect.Struct && to.Type().AssignableTo(shipType) && from.Kind() == reflect.String {
return defaultShipConfig(from.String()), nil
}
// If not, decode as normal.
return from.Interface(), nil
}

View file

@ -1,148 +0,0 @@
package config
import (
"testing"
"time"
"github.com/eosswedenorg/thalos/internal/log"
"github.com/stretchr/testify/require"
shipclient "github.com/eosswedenorg-go/antelope-ship-client"
)
func TestNew(t *testing.T) {
expected := Config{
MessageCodec: "json",
Log: log.Config{
MaxFileSize: 10 * 1000 * 1000, // 10 mb
MaxTime: time.Hour * 24,
},
Ship: ShipConfig{
StartBlockNum: shipclient.NULL_BLOCK_NUMBER,
EndBlockNum: shipclient.NULL_BLOCK_NUMBER,
MaxMessagesInFlight: 10,
IrreversibleOnly: false,
},
Redis: RedisConfig{
Addr: "localhost:6379",
Password: "",
DB: 0,
Prefix: "ship",
},
}
require.Equal(t, expected, New())
}
func TestRead(t *testing.T) {
expected := Config{
Name: "ship-reader-1",
Api: "http://127.0.0.1:8080",
MessageCodec: "mojibake",
Log: log.Config{
Filename: "some_file.log",
Directory: "/path/to/whatever",
MaxFileSize: 200,
MaxTime: 30 * time.Minute,
},
Ship: ShipConfig{
Url: "127.0.0.1:8089",
StartBlockNum: 23671836,
EndBlockNum: 23872222,
IrreversibleOnly: true,
MaxMessagesInFlight: 1337,
},
Telegram: TelegramConfig{
Id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw",
Channel: -123456789,
},
Redis: RedisConfig{
Addr: "localhost:6379",
User: "myuser",
Password: "passwd",
DB: 4,
Prefix: "some::ship",
},
}
cfg := Config{}
err := cfg.Read([]byte(`
name: "ship-reader-1"
api: "http://127.0.0.1:8080"
message_codec: "mojibake"
log:
filename: some_file.log
directory: /path/to/whatever
maxtime: 30m
maxfilesize: 200b
ship:
url: "127.0.0.1:8089"
irreversible_only: true
max_messages_in_flight: 1337
start_block_num: 23671836
end_block_num: 23872222
telegram:
id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw"
channel: -123456789
redis:
addr: "localhost:6379"
user: "myuser"
password: "passwd"
db: 4
prefix: "some::ship"
`))
require.NoError(t, err)
require.Equal(t, expected, cfg)
}
func TestReadShorthandShipUrl(t *testing.T) {
expected := Config{
Name: "ship-reader-1",
Api: "http://127.0.0.1:8080",
MessageCodec: "json",
Log: log.Config{
MaxFileSize: 10 * 1000 * 1000, // 10 mb
MaxTime: time.Hour * 24,
},
Ship: ShipConfig{
Url: "127.0.0.1:8089",
StartBlockNum: shipclient.NULL_BLOCK_NUMBER,
EndBlockNum: shipclient.NULL_BLOCK_NUMBER,
MaxMessagesInFlight: 10,
IrreversibleOnly: false,
},
Telegram: TelegramConfig{
Id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw",
Channel: -123456789,
},
Redis: RedisConfig{
Addr: "localhost:6379",
Password: "passwd",
DB: 4,
Prefix: "some::ship",
},
}
cfg := New()
err := cfg.Read([]byte(`
name: "ship-reader-1"
api: "http://127.0.0.1:8080"
ship: "127.0.0.1:8089"
telegram:
id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw"
channel: -123456789
redis:
addr: "localhost:6379"
password: "passwd"
db: 4
prefix: "some::ship"
`))
require.NoError(t, err)
require.Equal(t, expected, cfg)
}

View file

@ -1,37 +0,0 @@
package config
import (
"bytes"
"os"
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
)
// Read values from file
func (cfg *Config) ReadFile(filename string) error {
bytes, err := os.ReadFile(filename)
if err != nil {
return err
}
return cfg.Read(bytes)
}
func (cfg *Config) Read(in []byte) error {
v := viper.New()
v.SetConfigType("yaml")
if err := v.ReadConfig(bytes.NewBuffer(in)); err != nil {
return err
}
decoders := mapstructure.ComposeDecodeHookFunc(
mapstructure.TextUnmarshallerHookFunc(),
mapstructure.StringToTimeDurationHookFunc(),
mapstructure.StringToSliceHookFunc(","),
decodeShorthandShipConfig,
)
return v.Unmarshal(cfg, viper.DecodeHook(decoders))
}