From 775760ec322b631a67d988ef2c5a7f0f0a8b6c01 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Tue, 13 Feb 2024 23:12:58 +0100 Subject: [PATCH 1/5] cmd/thalos/server.go: get rid of exit channel. its sufficient to just use running flag. --- cmd/thalos/server.go | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/cmd/thalos/server.go b/cmd/thalos/server.go index e348cb9..e0a2976 100644 --- a/cmd/thalos/server.go +++ b/cmd/thalos/server.go @@ -42,8 +42,6 @@ var shClient *shipclient.Stream var running bool = true -var exit chan bool - var cache *Cache var cacheStore Store @@ -109,8 +107,7 @@ func readerLoop(processor *app.ShipProcessor) { }) if err != nil { log.WithError(err).Error("Failed to connect to SHIP") - running = false - continue + return } recon_cnt = 0 @@ -122,9 +119,8 @@ func readerLoop(processor *app.ShipProcessor) { if err := shClient.Run(); err != nil { if errors.Is(err, shipclient.ErrEndBlockReached) { - exit <- true log.Info("Endblock reached.") - break + return } log.WithError(err).Error("Failed to read from ship") @@ -143,17 +139,13 @@ func run(processor *app.ShipProcessor) { signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) // Wait for interrupt - select { - case sig := <-signals: - log.WithField("signal", sig).Info("Signal received") + sig := <-signals + log.WithField("signal", sig).Info("Signal received") - // Cleanly close the connection by sending a close message. - err := shClient.Shutdown() - if err != nil { - log.WithError(err).Info("failed to send close message to ship server") - } - case <-exit: - // Do nothing, just exit. + // Cleanly close the connection by sending a close message. + err := shClient.Shutdown() + if err != nil { + log.WithError(err).Info("failed to send close message to ship server") } running = false @@ -240,8 +232,6 @@ func serverCmd(ctx *cli.Context) error { var err error var chainInfo *eos.InfoResp - exit = make(chan bool) - skip_currentblock_cache := ctx.Bool("n") // Write PID file From afb90af1db3a08e10c9938dde368ce45cbb3a3f2 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Tue, 13 Feb 2024 23:14:47 +0100 Subject: [PATCH 2/5] cmd/thalos/server.go: in run() set running flag to false before calling shClient.Shutdown() so readerLoop() exists correctly. --- cmd/thalos/server.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/thalos/server.go b/cmd/thalos/server.go index e0a2976..412f007 100644 --- a/cmd/thalos/server.go +++ b/cmd/thalos/server.go @@ -143,12 +143,11 @@ func run(processor *app.ShipProcessor) { log.WithField("signal", sig).Info("Signal received") // Cleanly close the connection by sending a close message. + running = false err := shClient.Shutdown() if err != nil { log.WithError(err).Info("failed to send close message to ship server") } - - running = false } func getChain(def string) string { From 9974bfe3fd22804c911599b3530b857e5ee1c0a5 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Wed, 14 Feb 2024 13:00:33 +0100 Subject: [PATCH 3/5] rename app folder to internal. --- cmd/thalos/server.go | 24 +++++++++---------- cmd/tools/main.go | 2 +- cmd/tools/mock_publisher.go | 2 +- {app => internal}/abi/manager.go | 2 +- {app => internal}/abi/manager_test.go | 2 +- {app => internal}/cache/cache.go | 0 {app => internal}/cache/memory_store.go | 0 {app => internal}/cache/memory_store_test.go | 0 {app => internal}/cache/redis_store.go | 0 {app => internal}/cache/redis_store_test.go | 0 {app => internal}/cache/store.go | 0 {app => internal}/config/cli.go | 0 {app => internal}/config/config.go | 2 +- {app => internal}/config/config_test.go | 2 +- {app => internal}/config/file.go | 0 {app => internal}/config/yaml.go | 0 {app => internal}/driver/redis/publisher.go | 0 .../driver/redis/publisher_test.go | 0 {app => internal}/driver/writer.go | 0 {app => internal}/log/HookWriter.go | 0 {app => internal}/log/RotatingFile.go | 0 {app => internal}/log/config.go | 2 +- {app => internal}/log/config_test.go | 0 {app => internal}/log/init.go | 0 {app => internal/server}/ship_processor.go | 6 ++--- {app => internal/server}/state.go | 2 +- {app => internal}/types/size.go | 0 {app => internal}/types/size_test.go | 0 28 files changed, 23 insertions(+), 23 deletions(-) rename {app => internal}/abi/manager.go (96%) rename {app => internal}/abi/manager_test.go (99%) rename {app => internal}/cache/cache.go (100%) rename {app => internal}/cache/memory_store.go (100%) rename {app => internal}/cache/memory_store_test.go (100%) rename {app => internal}/cache/redis_store.go (100%) rename {app => internal}/cache/redis_store_test.go (100%) rename {app => internal}/cache/store.go (100%) rename {app => internal}/config/cli.go (100%) rename {app => internal}/config/config.go (96%) rename {app => internal}/config/config_test.go (98%) rename {app => internal}/config/file.go (100%) rename {app => internal}/config/yaml.go (100%) rename {app => internal}/driver/redis/publisher.go (100%) rename {app => internal}/driver/redis/publisher_test.go (100%) rename {app => internal}/driver/writer.go (100%) rename {app => internal}/log/HookWriter.go (100%) rename {app => internal}/log/RotatingFile.go (100%) rename {app => internal}/log/config.go (93%) rename {app => internal}/log/config_test.go (100%) rename {app => internal}/log/init.go (100%) rename {app => internal/server}/ship_processor.go (98%) rename {app => internal/server}/state.go (94%) rename {app => internal}/types/size.go (100%) rename {app => internal}/types/size_test.go (100%) diff --git a/cmd/thalos/server.go b/cmd/thalos/server.go index 412f007..7be4b29 100644 --- a/cmd/thalos/server.go +++ b/cmd/thalos/server.go @@ -18,12 +18,12 @@ import ( _ "github.com/eosswedenorg/thalos/api/message/json" _ "github.com/eosswedenorg/thalos/api/message/msgpack" api_redis "github.com/eosswedenorg/thalos/api/redis" - "github.com/eosswedenorg/thalos/app" - "github.com/eosswedenorg/thalos/app/abi" - . "github.com/eosswedenorg/thalos/app/cache" - "github.com/eosswedenorg/thalos/app/config" - driver "github.com/eosswedenorg/thalos/app/driver/redis" - . "github.com/eosswedenorg/thalos/app/log" + "github.com/eosswedenorg/thalos/internal/abi" + . "github.com/eosswedenorg/thalos/internal/cache" + "github.com/eosswedenorg/thalos/internal/config" + driver "github.com/eosswedenorg/thalos/internal/driver/redis" + . "github.com/eosswedenorg/thalos/internal/log" + . "github.com/eosswedenorg/thalos/internal/server" redis_cache "github.com/go-redis/cache/v9" "github.com/nikoksr/notify" "github.com/nikoksr/notify/service/telegram" @@ -46,7 +46,7 @@ var cache *Cache var cacheStore Store -func readerLoop(processor *app.ShipProcessor) { +func readerLoop(processor *ShipProcessor) { recon_cnt := 0 exp := &backoff.ExponentialBackOff{ @@ -128,7 +128,7 @@ func readerLoop(processor *app.ShipProcessor) { } } -func run(processor *app.ShipProcessor) { +func run(processor *ShipProcessor) { // Spawn reader loop in another thread. go readerLoop(processor) @@ -170,8 +170,8 @@ func initAbiManger(api *eos.API, chain_id string) *abi.AbiManager { return abi.NewAbiManager(cache, api) } -func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) app.StateLoader { - return func(state *app.State) { +func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) StateLoader { + return func(state *State) { var source string // Load state from cache. @@ -207,7 +207,7 @@ func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) app.State } } -func stateSaver(state app.State) error { +func stateSaver(state State) error { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) defer cancel() return cache.Set(ctx, "state", state, 0) @@ -337,7 +337,7 @@ func serverCmd(ctx *cli.Context) error { chain_id := getChain(chainInfo.ChainID.String()) - processor := app.SpawnProccessor( + processor := SpawnProccessor( shClient, stateLoader(chainInfo, skip_currentblock_cache), stateSaver, diff --git a/cmd/tools/main.go b/cmd/tools/main.go index 4670591..bf8a396 100644 --- a/cmd/tools/main.go +++ b/cmd/tools/main.go @@ -5,7 +5,7 @@ import ( "github.com/urfave/cli/v2" - _ "github.com/eosswedenorg/thalos/app/log" + _ "github.com/eosswedenorg/thalos/internal/log" log "github.com/sirupsen/logrus" ) diff --git a/cmd/tools/mock_publisher.go b/cmd/tools/mock_publisher.go index 7d69e1a..421c6e4 100644 --- a/cmd/tools/mock_publisher.go +++ b/cmd/tools/mock_publisher.go @@ -10,7 +10,7 @@ import ( "github.com/eosswedenorg/thalos/api/message" _ "github.com/eosswedenorg/thalos/api/message/json" api_redis "github.com/eosswedenorg/thalos/api/redis" - redis_driver "github.com/eosswedenorg/thalos/app/driver/redis" + redis_driver "github.com/eosswedenorg/thalos/internal/driver/redis" "github.com/redis/go-redis/v9" log "github.com/sirupsen/logrus" diff --git a/app/abi/manager.go b/internal/abi/manager.go similarity index 96% rename from app/abi/manager.go rename to internal/abi/manager.go index 43be262..2441e01 100644 --- a/app/abi/manager.go +++ b/internal/abi/manager.go @@ -6,7 +6,7 @@ import ( "time" eos "github.com/eoscanada/eos-go" - "github.com/eosswedenorg/thalos/app/cache" + "github.com/eosswedenorg/thalos/internal/cache" ) // AbiManager handles an ABI cache that fetches the ABI from an API on cache miss. diff --git a/app/abi/manager_test.go b/internal/abi/manager_test.go similarity index 99% rename from app/abi/manager_test.go rename to internal/abi/manager_test.go index 661c8ec..7ecfc75 100644 --- a/app/abi/manager_test.go +++ b/internal/abi/manager_test.go @@ -9,7 +9,7 @@ import ( eos "github.com/eoscanada/eos-go" - "github.com/eosswedenorg/thalos/app/cache" + "github.com/eosswedenorg/thalos/internal/cache" "github.com/stretchr/testify/assert" ) diff --git a/app/cache/cache.go b/internal/cache/cache.go similarity index 100% rename from app/cache/cache.go rename to internal/cache/cache.go diff --git a/app/cache/memory_store.go b/internal/cache/memory_store.go similarity index 100% rename from app/cache/memory_store.go rename to internal/cache/memory_store.go diff --git a/app/cache/memory_store_test.go b/internal/cache/memory_store_test.go similarity index 100% rename from app/cache/memory_store_test.go rename to internal/cache/memory_store_test.go diff --git a/app/cache/redis_store.go b/internal/cache/redis_store.go similarity index 100% rename from app/cache/redis_store.go rename to internal/cache/redis_store.go diff --git a/app/cache/redis_store_test.go b/internal/cache/redis_store_test.go similarity index 100% rename from app/cache/redis_store_test.go rename to internal/cache/redis_store_test.go diff --git a/app/cache/store.go b/internal/cache/store.go similarity index 100% rename from app/cache/store.go rename to internal/cache/store.go diff --git a/app/config/cli.go b/internal/config/cli.go similarity index 100% rename from app/config/cli.go rename to internal/config/cli.go diff --git a/app/config/config.go b/internal/config/config.go similarity index 96% rename from app/config/config.go rename to internal/config/config.go index 1e71e33..790d647 100644 --- a/app/config/config.go +++ b/internal/config/config.go @@ -3,7 +3,7 @@ package config import ( "time" - "github.com/eosswedenorg/thalos/app/log" + "github.com/eosswedenorg/thalos/internal/log" shipclient "github.com/eosswedenorg-go/antelope-ship-client" ) diff --git a/app/config/config_test.go b/internal/config/config_test.go similarity index 98% rename from app/config/config_test.go rename to internal/config/config_test.go index f3737a7..d798436 100644 --- a/app/config/config_test.go +++ b/internal/config/config_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/eosswedenorg/thalos/app/log" + "github.com/eosswedenorg/thalos/internal/log" "github.com/stretchr/testify/require" shipclient "github.com/eosswedenorg-go/antelope-ship-client" diff --git a/app/config/file.go b/internal/config/file.go similarity index 100% rename from app/config/file.go rename to internal/config/file.go diff --git a/app/config/yaml.go b/internal/config/yaml.go similarity index 100% rename from app/config/yaml.go rename to internal/config/yaml.go diff --git a/app/driver/redis/publisher.go b/internal/driver/redis/publisher.go similarity index 100% rename from app/driver/redis/publisher.go rename to internal/driver/redis/publisher.go diff --git a/app/driver/redis/publisher_test.go b/internal/driver/redis/publisher_test.go similarity index 100% rename from app/driver/redis/publisher_test.go rename to internal/driver/redis/publisher_test.go diff --git a/app/driver/writer.go b/internal/driver/writer.go similarity index 100% rename from app/driver/writer.go rename to internal/driver/writer.go diff --git a/app/log/HookWriter.go b/internal/log/HookWriter.go similarity index 100% rename from app/log/HookWriter.go rename to internal/log/HookWriter.go diff --git a/app/log/RotatingFile.go b/internal/log/RotatingFile.go similarity index 100% rename from app/log/RotatingFile.go rename to internal/log/RotatingFile.go diff --git a/app/log/config.go b/internal/log/config.go similarity index 93% rename from app/log/config.go rename to internal/log/config.go index 16498ce..b5114f8 100644 --- a/app/log/config.go +++ b/internal/log/config.go @@ -4,7 +4,7 @@ import ( "path" "time" - "github.com/eosswedenorg/thalos/app/types" + "github.com/eosswedenorg/thalos/internal/types" ) // Config represents configuration parameters for a log. diff --git a/app/log/config_test.go b/internal/log/config_test.go similarity index 100% rename from app/log/config_test.go rename to internal/log/config_test.go diff --git a/app/log/init.go b/internal/log/init.go similarity index 100% rename from app/log/init.go rename to internal/log/init.go diff --git a/app/ship_processor.go b/internal/server/ship_processor.go similarity index 98% rename from app/ship_processor.go rename to internal/server/ship_processor.go index b750cb5..d510c99 100644 --- a/app/ship_processor.go +++ b/internal/server/ship_processor.go @@ -1,4 +1,4 @@ -package app +package server import ( "encoding/hex" @@ -6,8 +6,8 @@ import ( "github.com/eosswedenorg/thalos/api" "github.com/eosswedenorg/thalos/api/message" - "github.com/eosswedenorg/thalos/app/abi" - "github.com/eosswedenorg/thalos/app/driver" + "github.com/eosswedenorg/thalos/internal/abi" + "github.com/eosswedenorg/thalos/internal/driver" log "github.com/sirupsen/logrus" diff --git a/app/state.go b/internal/server/state.go similarity index 94% rename from app/state.go rename to internal/server/state.go index 2edb323..d8312ef 100644 --- a/app/state.go +++ b/internal/server/state.go @@ -1,4 +1,4 @@ -package app +package server // State represents thalos runtime state type State struct { diff --git a/app/types/size.go b/internal/types/size.go similarity index 100% rename from app/types/size.go rename to internal/types/size.go diff --git a/app/types/size_test.go b/internal/types/size_test.go similarity index 100% rename from app/types/size_test.go rename to internal/types/size_test.go From 7108299550642dcf62e32be0dadcc1daf7efec96 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Wed, 14 Feb 2024 13:01:34 +0100 Subject: [PATCH 4/5] cmd/thalos/main.go: minor cleanup in VersionPrinter --- cmd/thalos/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/thalos/main.go b/cmd/thalos/main.go index 7b749ce..961dbb1 100644 --- a/cmd/thalos/main.go +++ b/cmd/thalos/main.go @@ -23,8 +23,8 @@ func main() { DisableDefaultText: true, } - cli.VersionPrinter = func(cCtx *cli.Context) { - fmt.Printf("Version %s\n", VersionString) + cli.VersionPrinter = func(ctx *cli.Context) { + fmt.Printf("Version %s\n", ctx.App.Version) } cli.VersionFlag = &cli.BoolFlag{ From b853bc026ed5a55d8d89db890f88e8403afd7a65 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Wed, 14 Feb 2024 21:03:37 +0100 Subject: [PATCH 5/5] cmd/thalos/server.go: remove global variables. --- cmd/thalos/server.go | 69 ++++++++++++++++++-------------------------- 1 file changed, 28 insertions(+), 41 deletions(-) diff --git a/cmd/thalos/server.go b/cmd/thalos/server.go index 7be4b29..a642420 100644 --- a/cmd/thalos/server.go +++ b/cmd/thalos/server.go @@ -19,6 +19,7 @@ import ( _ "github.com/eosswedenorg/thalos/api/message/msgpack" api_redis "github.com/eosswedenorg/thalos/api/redis" "github.com/eosswedenorg/thalos/internal/abi" + "github.com/eosswedenorg/thalos/internal/cache" . "github.com/eosswedenorg/thalos/internal/cache" "github.com/eosswedenorg/thalos/internal/config" driver "github.com/eosswedenorg/thalos/internal/driver/redis" @@ -32,21 +33,7 @@ import ( "github.com/urfave/cli/v2" ) -// --------------------------- -// Global variables -// --------------------------- - -var conf config.Config - -var shClient *shipclient.Stream - -var running bool = true - -var cache *Cache - -var cacheStore Store - -func readerLoop(processor *ShipProcessor) { +func readerLoop(conf *config.Config, running *bool, shClient *shipclient.Stream, processor *ShipProcessor) { recon_cnt := 0 exp := &backoff.ExponentialBackOff{ @@ -87,7 +74,7 @@ func readerLoop(processor *ShipProcessor) { return shClient.SendBlocksRequest() } - for running { + for *running { err := backoff.RetryNotify(connectOp, exp, func(err error, d time.Duration) { if recon_cnt >= 3 { @@ -128,9 +115,11 @@ func readerLoop(processor *ShipProcessor) { } } -func run(processor *ShipProcessor) { +func run(conf *config.Config, shClient *shipclient.Stream, processor *ShipProcessor) { + running := true + // Spawn reader loop in another thread. - go readerLoop(processor) + go readerLoop(conf, &running, shClient, processor) // Create interrupt channel. signals := make(chan os.Signal, 1) @@ -150,13 +139,6 @@ func run(processor *ShipProcessor) { } } -func getChain(def string) string { - if len(conf.Ship.Chain) > 0 { - return conf.Ship.Chain - } - return def -} - func LogLevels() []string { list := []string{} for _, lvl := range log.AllLevels { @@ -165,12 +147,12 @@ func LogLevels() []string { return list } -func initAbiManger(api *eos.API, chain_id string) *abi.AbiManager { - cache := NewCache("thalos::cache::abi::"+chain_id, cacheStore) +func initAbiManager(api *eos.API, store cache.Store, chain_id string) *abi.AbiManager { + cache := NewCache("thalos::cache::abi::"+chain_id, store) return abi.NewAbiManager(cache, api) } -func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) StateLoader { +func stateLoader(conf config.Config, chainInfo *eos.InfoResp, cache *cache.Cache, current_block_no_cache bool) StateLoader { return func(state *State) { var source string @@ -207,10 +189,12 @@ func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) StateLoad } } -func stateSaver(state State) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) - defer cancel() - return cache.Set(ctx, "state", state, 0) +func stateSaver(cache *cache.Cache) StateSaver { + return func(state State) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + defer cancel() + return cache.Set(ctx, "state", state, 0) + } } func ReadConfig(cfg *config.Config, ctx *cli.Context) error { @@ -243,7 +227,7 @@ func serverCmd(ctx *cli.Context) error { } // Parse config - conf = config.New() + conf := config.New() if err = ReadConfig(&conf, ctx); err != nil { return fmt.Errorf("config: %s", err) } @@ -307,14 +291,14 @@ func serverCmd(ctx *cli.Context) error { } // Setup cache storage - cacheStore = NewRedisStore(&redis_cache.Options{ + cacheStore := NewRedisStore(&redis_cache.Options{ Redis: rdb, // Cache 10k keys for 10 minutes. LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute), }) // Setup general cache - cache = NewCache("thalos::cache::instance::"+conf.Name, cacheStore) + cache := NewCache("thalos::cache::instance::"+conf.Name, cacheStore) log.WithField("api", conf.Api).Info("Get chain info from api") eosClient := eos.New(conf.Api) @@ -323,7 +307,7 @@ func serverCmd(ctx *cli.Context) error { return fmt.Errorf("eosapi: %s", err) } - shClient = shipclient.NewStream(func(s *shipclient.Stream) { + shClient := shipclient.NewStream(func(s *shipclient.Stream) { s.StartBlock = conf.Ship.StartBlockNum s.EndBlock = conf.Ship.EndBlockNum s.IrreversibleOnly = conf.Ship.IrreversibleOnly @@ -335,22 +319,25 @@ func serverCmd(ctx *cli.Context) error { return fmt.Errorf("codec: %s", err) } - chain_id := getChain(chainInfo.ChainID.String()) + chain_id := conf.Ship.Chain + if len(chain_id) < 1 { + chain_id = chainInfo.ChainID.String() + } processor := SpawnProccessor( shClient, - stateLoader(chainInfo, skip_currentblock_cache), - stateSaver, + stateLoader(conf, chainInfo, cache, skip_currentblock_cache), + stateSaver(cache), driver.NewPublisher(context.Background(), rdb, api_redis.Namespace{ Prefix: conf.Redis.Prefix, ChainID: chain_id, }), - initAbiManger(eosClient, chain_id), + initAbiManager(eosClient, cacheStore, chain_id), codec, ) // Run the application - run(processor) + run(&conf, shClient, processor) // Close the processor properly processor.Close()