diff --git a/cmd/thalos/main.go b/cmd/thalos/main.go index 994cdd2..fc55eef 100644 --- a/cmd/thalos/main.go +++ b/cmd/thalos/main.go @@ -1,6 +1,7 @@ package main import ( + "github.com/eosswedenorg/thalos/internal/config" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -28,13 +29,14 @@ func init() { rootCmd.SetVersionTemplate(`{{with .Name}}{{printf "%s " .}}{{end}}{{printf "v%s" .Version}}` + "\n") flags := pflag.FlagSet{} + flags.StringP("config", "c", "./config.yml", "Config file to read") flags.StringP("level", "L", "info", "Log level to use") - flags.StringP("log", "l", "", "Path to log file (default: print to stdout/stderr)") flags.StringP("pid", "p", "", "Where to write process id") flags.BoolP("no-state-cache", "n", false, "Force the application to take start block from config/api") rootCmd.PersistentFlags().AddFlagSet(&flags) + rootCmd.PersistentFlags().AddFlagSet(config.GetFlags()) } func main() { diff --git a/cmd/thalos/server.go b/cmd/thalos/server.go index e5afc23..65d04ec 100644 --- a/cmd/thalos/server.go +++ b/cmd/thalos/server.go @@ -7,6 +7,7 @@ import ( "io" "os" "os/signal" + "path" "syscall" "time" @@ -153,7 +154,7 @@ func initAbiManager(api *eos.API, store cache.Store, chain_id string) *abi.AbiMa return abi.NewAbiManager(cache, api) } -func stateLoader(conf config.Config, chainInfo func() *eos.InfoResp, cache *cache.Cache, current_block_no_cache bool) StateLoader { +func stateLoader(conf *config.Config, start_block_flag *pflag.Flag, chainInfo func() *eos.InfoResp, cache *cache.Cache, current_block_no_cache bool) StateLoader { return func(state *State) { var source string @@ -167,7 +168,13 @@ func stateLoader(conf config.Config, chainInfo func() *eos.InfoResp, cache *cach if current_block_no_cache || err != nil { // Set from config if we have a sane value. if conf.Ship.StartBlockNum != shipclient.NULL_BLOCK_NUMBER { - source = "config" + + if start_block_flag != nil && start_block_flag.Changed { + source = "cli" + } else { + source = "config" + } + state.CurrentBlock = conf.Ship.StartBlockNum } else { // Otherwise, set from api. @@ -198,23 +205,34 @@ func stateSaver(cache *cache.Cache) StateSaver { } } -func ReadConfig(cfg *config.Config, flags *pflag.FlagSet) error { +func GetConfig(flags *pflag.FlagSet) (*config.Config, error) { filename, err := flags.GetString("config") if err != nil { - return err + return nil, err } - // Read file first. - if err := cfg.ReadFile(filename); err != nil { - return err + cfg, err := config.NewBuilder(). + SetConfigFile(filename). + SetFlags(flags). + Build() + if err != nil { + return nil, err } - // Then override any cli flags - if err := cfg.ReadCliFlags(flags); err != nil { - return err + logFile, _ := flags.GetString("log") + if len(logFile) > 0 { + cfg.Log.Directory = path.Dir(logFile) + cfg.Log.Filename = path.Base(logFile) } - return nil + // If start-block is provided, we should set no-state-cache to true. + if startBlock := flags.Lookup("start-block"); startBlock != nil && startBlock.Changed { + if err := flags.Set("no-state-cache", "true"); err != nil { + return cfg, nil + } + } + + return cfg, nil } // "Clever" way to make sure we only call the api once. @@ -244,8 +262,6 @@ func chainInfoOnce(api *eos.API) func() *eos.InfoResp { func serverCmd(cmd *cobra.Command, args []string) { var err error - skip_currentblock_cache, _ := cmd.Flags().GetBool("no-state-cache") - // Write PID file pidFile, err := cmd.Flags().GetString("pid") if err != nil { @@ -257,12 +273,14 @@ func serverCmd(cmd *cobra.Command, args []string) { } // Parse config - conf := config.New() - if err = ReadConfig(&conf, cmd.Flags()); err != nil { + conf, err := GetConfig(cmd.Flags()) + if err != nil { log.WithError(err).Fatal("Failed to read config") return } + skip_currentblock_cache, _ := cmd.Flags().GetBool("no-state-cache") + flagLevel, _ := cmd.Flags().GetString("level") lvl, err := log.ParseLevel(flagLevel) if err == nil { @@ -360,7 +378,7 @@ func serverCmd(cmd *cobra.Command, args []string) { processor := SpawnProccessor( shClient, - stateLoader(conf, chainInfo, cache, skip_currentblock_cache), + stateLoader(conf, cmd.Flags().Lookup("start-block"), chainInfo, cache, skip_currentblock_cache), stateSaver(cache), driver.NewPublisher(context.Background(), rdb, api_redis.Namespace{ Prefix: conf.Redis.Prefix, @@ -371,7 +389,7 @@ func serverCmd(cmd *cobra.Command, args []string) { ) // Run the application - run(&conf, shClient, processor) + run(conf, shClient, processor) // Close the processor properly processor.Close() diff --git a/internal/config/builder.go b/internal/config/builder.go new file mode 100644 index 0000000..5257e3d --- /dev/null +++ b/internal/config/builder.go @@ -0,0 +1,120 @@ +package config + +import ( + "errors" + "io" + "os" + + "github.com/mitchellh/mapstructure" + "github.com/spf13/pflag" + "github.com/spf13/viper" +) + +// This is a simple module that encapsulate the creation +// of a config object and can override values from cli flags. + +type Builder struct { + in io.Reader + flags *pflag.FlagSet + binds map[string]string +} + +func NewBuilder() *Builder { + return &Builder{ + binds: map[string]string{ + "api": "url", + "message_codec": "codec", + + // Redis + "redis.addr": "redis-addr", + "redis.user": "redis-user", + "redis.password": "redis-password", + "redis.db": "redis-db", + "redis.prefix": "redis-prefix", + + // Telegram + "telegram.id": "telegram-id", + "telegram.channel": "telegram-channel", + + // Log + "log.maxfilesize": "log-max-filesize", + "log.maxtime": "log-max-time", + + // Ship + "ship.url": "ship-url", + "ship.start_block_num": "start-block", + "ship.end_block_num": "end-block", + "ship.irreversible_only": "irreversible-only", + "ship.max_messages_in_flight": "max-msg-in-flight", + "ship.chain": "chain", + }, + } +} + +// Set the config file to read +func (b *Builder) SetConfigFile(filename string) *Builder { + file, _ := os.Open(filename) + return b.SetSource(file) +} + +// Set the source to read +func (b *Builder) SetSource(in io.Reader) *Builder { + b.in = in + return b +} + +// Set all flags that the builder should use. +func (b *Builder) SetFlags(flags *pflag.FlagSet) *Builder { + b.flags = flags + return b +} + +// Add a flag to the builder. +func (b *Builder) AddFlag(flag *pflag.Flag) *Builder { + b.flags.AddFlag(flag) + return b +} + +// Build the config object from file, cli-flags +func (b *Builder) Build() (*Config, error) { + if b.in == nil { + return nil, errors.New("Config not set") + } + + conf := Config{} + + v := viper.New() + v.SetConfigType("yaml") + + if b.flags != nil { + // bind flags in viper. + for key, flagname := range b.binds { + flag := b.flags.Lookup(flagname) + if flag == nil { + continue + } + + if err := v.BindPFlag(key, flag); err != nil { + return nil, err + } + } + } + + // Read config and unmarshal + if err := v.ReadConfig(b.in); err != nil { + return nil, err + } + + decoders := mapstructure.ComposeDecodeHookFunc( + mapstructure.TextUnmarshallerHookFunc(), + mapstructure.StringToTimeDurationHookFunc(), + mapstructure.StringToSliceHookFunc(","), + ) + + err := v.Unmarshal(&conf, viper.DecodeHook(decoders)) + if err != nil { + return nil, err + } + + return &conf, nil +} diff --git a/internal/config/builder_test.go b/internal/config/builder_test.go new file mode 100644 index 0000000..9bc1589 --- /dev/null +++ b/internal/config/builder_test.go @@ -0,0 +1,210 @@ +package config + +import ( + "bytes" + "testing" + "time" + + "github.com/eosswedenorg/thalos/internal/log" + "github.com/stretchr/testify/require" +) + +func TestBuilder(t *testing.T) { + expected := Config{ + Name: "ship-reader-1", + Api: "http://127.0.0.1:8080", + MessageCodec: "mojibake", + Log: log.Config{ + Filename: "some_file.log", + Directory: "/path/to/whatever", + MaxFileSize: 200, + MaxTime: 30 * time.Minute, + }, + Ship: ShipConfig{ + Url: "127.0.0.1:8089", + StartBlockNum: 23671836, + EndBlockNum: 23872222, + IrreversibleOnly: true, + MaxMessagesInFlight: 1337, + }, + Telegram: TelegramConfig{ + Id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw", + Channel: -123456789, + }, + Redis: RedisConfig{ + Addr: "localhost:6379", + User: "myuser", + Password: "passwd", + DB: 4, + Prefix: "some::ship", + }, + } + + builder := NewBuilder() + builder.SetSource(bytes.NewBuffer([]byte(` +name: "ship-reader-1" +api: "http://127.0.0.1:8080" +message_codec: "mojibake" +log: + filename: some_file.log + directory: /path/to/whatever + maxtime: 30m + maxfilesize: 200b +ship: + url: "127.0.0.1:8089" + irreversible_only: true + max_messages_in_flight: 1337 + start_block_num: 23671836 + end_block_num: 23872222 +telegram: + id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw" + channel: -123456789 +redis: + addr: "localhost:6379" + user: "myuser" + password: "passwd" + db: 4 + prefix: "some::ship" +`))) + + cfg, err := builder.Build() + + require.NoError(t, err) + require.Equal(t, &expected, cfg) +} + +func TestBuilder_ConfigWithFlags(t *testing.T) { + expected := Config{ + Name: "ship-reader-1", + Api: "https://api.example.com", + MessageCodec: "msgpack", + Log: log.Config{ + Filename: "mylog.log", + Directory: "/var/log", + MaxFileSize: 200, + MaxTime: 30 * time.Minute, + }, + Ship: ShipConfig{ + Url: "127.0.0.1:8089", + StartBlockNum: 23671836, + EndBlockNum: 23872222, + IrreversibleOnly: true, + MaxMessagesInFlight: 1337, + }, + Telegram: TelegramConfig{ + Id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw", + Channel: -123456789, + }, + Redis: RedisConfig{ + Addr: "localhost:6379", + User: "userfromcli", + Password: "passwd", + DB: 4, + Prefix: "some::ship", + }, + } + + builder := NewBuilder() + builder.SetSource(bytes.NewBuffer([]byte(` +name: "ship-reader-1" +api: "http://127.0.0.1:8080" +message_codec: "mojibake" +log: + filename: mylog.log + directory: /var/log + maxtime: 30m + maxfilesize: 200b +ship: + url: "127.0.0.1:8089" + irreversible_only: true + max_messages_in_flight: 1337 + start_block_num: 23671836 + end_block_num: 23872222 +telegram: + id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw" + channel: -123456789 +redis: + addr: "localhost:6379" + user: "myuser" + password: "passwd" + db: 4 + prefix: "some::ship" +`))) + + flags := GetFlags() + + require.NoError(t, flags.Set("url", "https://api.example.com")) + require.NoError(t, flags.Set("codec", "msgpack")) + require.NoError(t, flags.Set("redis-user", "userfromcli")) + + builder.SetFlags(flags) + + cfg, err := builder.Build() + + require.NoError(t, err) + require.Equal(t, &expected, cfg) +} + +func TestBuilder_NilSource(t *testing.T) { + cfg, err := NewBuilder().Build() + require.Nil(t, cfg) + require.EqualError(t, err, "Config not set") +} + +func TestBuilder_Flags(t *testing.T) { + flags := GetFlags() + + require.NoError(t, flags.Set("url", "https://myapi")) + require.NoError(t, flags.Set("codec", "binary")) + require.NoError(t, flags.Set("redis-addr", "154.223.38.15:6380")) + require.NoError(t, flags.Set("redis-user", "myuser")) + require.NoError(t, flags.Set("redis-password", "secret123")) + require.NoError(t, flags.Set("redis-db", "3")) + require.NoError(t, flags.Set("redis-prefix", "custom-prefix")) + require.NoError(t, flags.Set("telegram-id", "72983126312982618")) + require.NoError(t, flags.Set("telegram-channel", "-293492332")) + require.NoError(t, flags.Set("log-max-filesize", "25mb")) + require.NoError(t, flags.Set("log-max-time", "10m")) + require.NoError(t, flags.Set("ship-url", "ws://myship.com:7823")) + require.NoError(t, flags.Set("start-block", "7327833")) + require.NoError(t, flags.Set("end-block", "329408392")) + require.NoError(t, flags.Set("irreversible-only", "true")) + require.NoError(t, flags.Set("max-msg-in-flight", "98")) + require.NoError(t, flags.Set("chain", "wax")) + + cfg, err := NewBuilder(). + SetSource(bytes.NewReader([]byte(``))). + SetFlags(flags). + Build() + + expected := Config{ + Api: "https://myapi", + MessageCodec: "binary", + Log: log.Config{ + MaxFileSize: 25 * 1000 * 1000, // 25 mb + MaxTime: time.Minute * 10, + }, + Ship: ShipConfig{ + Url: "ws://myship.com:7823", + StartBlockNum: 7327833, + EndBlockNum: 329408392, + MaxMessagesInFlight: 98, + IrreversibleOnly: true, + Chain: "wax", + }, + Telegram: TelegramConfig{ + Id: "72983126312982618", + Channel: -293492332, + }, + Redis: RedisConfig{ + Addr: "154.223.38.15:6380", + User: "myuser", + Password: "secret123", + DB: 3, + Prefix: "custom-prefix", + }, + } + + require.NoError(t, err) + require.Equal(t, &expected, cfg) +} diff --git a/internal/config/cli.go b/internal/config/cli.go index d5bb6ed..d831b7a 100644 --- a/internal/config/cli.go +++ b/internal/config/cli.go @@ -1,17 +1,46 @@ package config import ( - "path" + "time" + shipclient "github.com/eosswedenorg-go/antelope-ship-client" "github.com/spf13/pflag" ) -// Read cli flag values into the config -func (cfg *Config) ReadCliFlags(flags *pflag.FlagSet) error { - logFile, _ := flags.GetString("log") - if len(logFile) > 0 { - cfg.Log.Directory = path.Dir(logFile) - cfg.Log.Filename = path.Base(logFile) - } - return nil +func GetFlags() *pflag.FlagSet { + flags := pflag.FlagSet{} + + // Generic + flags.StringP("url", "u", "", "Url to antelope api") + flags.String("codec", "json", "Codec used to send messages") + + // Redis + flags.String("redis-addr", "127.0.0.1:6379", "host:port to redis server") + flags.String("redis-user", "", "Redis username") + flags.String("redis-password", "", "Redis password") + flags.Int("redis-db", 0, "Redis database") + flags.String("redis-prefix", "ship", "Redis channel prefix") + + // Telegram + flags.String("telegram-id", "", "Id of telegram bot") + flags.Int64("telegram-channel", 0, "Telegram channel to send notifications to") + + // Log + flags.StringP("log", "l", "", "Path to log file (default: print to stdout/stderr)") + flags.String("log-max-filesize", "10mb", "Max filesize for logfile to rotate") + flags.Duration("log-max-time", time.Hour*24, "Max time for logfile to rotate") + + // Ship + flags.String("ship-url", "ws://127.0.0.1:8080", "Url to ship node") + flags.Uint32("start-block", shipclient.NULL_BLOCK_NUMBER, "Start to stream from this block") + flags.Uint32("end-block", shipclient.NULL_BLOCK_NUMBER, "Stop streaming when this block is reached") + + flags.Lookup("start-block").DefValue = "config value, cache, head from api" + flags.Lookup("end-block").DefValue = "none" + + flags.Bool("irreversible-only", false, "Only stream irreversible blocks from ship") + flags.Int("max-msg-in-flight", 10, "Maximum messages that can be sent from SHIP without acknowledgement") + flags.String("chain", "", "ChainID used in channel namespace, can be any string (default from api)") + + return &flags } diff --git a/internal/config/config.go b/internal/config/config.go index e137352..41777df 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,12 +1,7 @@ package config import ( - "reflect" - "time" - "github.com/eosswedenorg/thalos/internal/log" - - shipclient "github.com/eosswedenorg-go/antelope-ship-client" ) type RedisConfig struct { @@ -43,43 +38,3 @@ type Config struct { Telegram TelegramConfig `yaml:"telegram" mapstructure:"telegram"` } - -// Create a new Config object with default values -func New() Config { - return Config{ - MessageCodec: "json", - Log: log.Config{ - MaxFileSize: 10 * 1000 * 1000, // 10 mb - MaxTime: time.Hour * 24, - }, - Ship: defaultShipConfig(""), - Redis: RedisConfig{ - Addr: "localhost:6379", - Prefix: "ship", - }, - } -} - -func defaultShipConfig(url string) ShipConfig { - return ShipConfig{ - Url: url, - StartBlockNum: shipclient.NULL_BLOCK_NUMBER, - EndBlockNum: shipclient.NULL_BLOCK_NUMBER, - MaxMessagesInFlight: 10, - IrreversibleOnly: false, - } -} - -// mapstructure DecodeHook that can parse a shorthand ship config (only string instead of struct.) -func decodeShorthandShipConfig(from reflect.Value, to reflect.Value) (interface{}, error) { - shipType := reflect.TypeOf(ShipConfig{}) - - // If to is a struct and is assignable to a ShipConfig and from is a string. - // Then we treat the from value as ShipConfig.Url - if to.Kind() == reflect.Struct && to.Type().AssignableTo(shipType) && from.Kind() == reflect.String { - return defaultShipConfig(from.String()), nil - } - - // If not, decode as normal. - return from.Interface(), nil -} diff --git a/internal/config/config_test.go b/internal/config/config_test.go deleted file mode 100644 index ceba4e2..0000000 --- a/internal/config/config_test.go +++ /dev/null @@ -1,148 +0,0 @@ -package config - -import ( - "testing" - "time" - - "github.com/eosswedenorg/thalos/internal/log" - "github.com/stretchr/testify/require" - - shipclient "github.com/eosswedenorg-go/antelope-ship-client" -) - -func TestNew(t *testing.T) { - expected := Config{ - MessageCodec: "json", - - Log: log.Config{ - MaxFileSize: 10 * 1000 * 1000, // 10 mb - MaxTime: time.Hour * 24, - }, - - Ship: ShipConfig{ - StartBlockNum: shipclient.NULL_BLOCK_NUMBER, - EndBlockNum: shipclient.NULL_BLOCK_NUMBER, - MaxMessagesInFlight: 10, - IrreversibleOnly: false, - }, - - Redis: RedisConfig{ - Addr: "localhost:6379", - Password: "", - DB: 0, - Prefix: "ship", - }, - } - - require.Equal(t, expected, New()) -} - -func TestRead(t *testing.T) { - expected := Config{ - Name: "ship-reader-1", - Api: "http://127.0.0.1:8080", - MessageCodec: "mojibake", - Log: log.Config{ - Filename: "some_file.log", - Directory: "/path/to/whatever", - MaxFileSize: 200, - MaxTime: 30 * time.Minute, - }, - Ship: ShipConfig{ - Url: "127.0.0.1:8089", - StartBlockNum: 23671836, - EndBlockNum: 23872222, - IrreversibleOnly: true, - MaxMessagesInFlight: 1337, - }, - Telegram: TelegramConfig{ - Id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw", - Channel: -123456789, - }, - Redis: RedisConfig{ - Addr: "localhost:6379", - User: "myuser", - Password: "passwd", - DB: 4, - Prefix: "some::ship", - }, - } - - cfg := Config{} - err := cfg.Read([]byte(` -name: "ship-reader-1" -api: "http://127.0.0.1:8080" -message_codec: "mojibake" -log: - filename: some_file.log - directory: /path/to/whatever - maxtime: 30m - maxfilesize: 200b -ship: - url: "127.0.0.1:8089" - irreversible_only: true - max_messages_in_flight: 1337 - start_block_num: 23671836 - end_block_num: 23872222 -telegram: - id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw" - channel: -123456789 -redis: - addr: "localhost:6379" - user: "myuser" - password: "passwd" - db: 4 - prefix: "some::ship" -`)) - - require.NoError(t, err) - require.Equal(t, expected, cfg) -} - -func TestReadShorthandShipUrl(t *testing.T) { - expected := Config{ - Name: "ship-reader-1", - Api: "http://127.0.0.1:8080", - MessageCodec: "json", - Log: log.Config{ - MaxFileSize: 10 * 1000 * 1000, // 10 mb - MaxTime: time.Hour * 24, - }, - Ship: ShipConfig{ - Url: "127.0.0.1:8089", - StartBlockNum: shipclient.NULL_BLOCK_NUMBER, - EndBlockNum: shipclient.NULL_BLOCK_NUMBER, - MaxMessagesInFlight: 10, - IrreversibleOnly: false, - }, - Telegram: TelegramConfig{ - Id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw", - Channel: -123456789, - }, - Redis: RedisConfig{ - Addr: "localhost:6379", - Password: "passwd", - DB: 4, - Prefix: "some::ship", - }, - } - - cfg := New() - - err := cfg.Read([]byte(` -name: "ship-reader-1" -api: "http://127.0.0.1:8080" -ship: "127.0.0.1:8089" -telegram: - id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw" - channel: -123456789 -redis: - addr: "localhost:6379" - password: "passwd" - db: 4 - prefix: "some::ship" -`)) - - require.NoError(t, err) - require.Equal(t, expected, cfg) -} diff --git a/internal/config/file.go b/internal/config/file.go deleted file mode 100644 index c14b00d..0000000 --- a/internal/config/file.go +++ /dev/null @@ -1,37 +0,0 @@ -package config - -import ( - "bytes" - "os" - - "github.com/mitchellh/mapstructure" - "github.com/spf13/viper" -) - -// Read values from file -func (cfg *Config) ReadFile(filename string) error { - bytes, err := os.ReadFile(filename) - if err != nil { - return err - } - - return cfg.Read(bytes) -} - -func (cfg *Config) Read(in []byte) error { - v := viper.New() - v.SetConfigType("yaml") - - if err := v.ReadConfig(bytes.NewBuffer(in)); err != nil { - return err - } - - decoders := mapstructure.ComposeDecodeHookFunc( - mapstructure.TextUnmarshallerHookFunc(), - mapstructure.StringToTimeDurationHookFunc(), - mapstructure.StringToSliceHookFunc(","), - decodeShorthandShipConfig, - ) - - return v.Unmarshal(cfg, viper.DecodeHook(decoders)) -}