diff --git a/cmd/thalos/server.go b/cmd/thalos/server.go index bae8a48..4170296 100644 --- a/cmd/thalos/server.go +++ b/cmd/thalos/server.go @@ -18,12 +18,13 @@ import ( _ "github.com/eosswedenorg/thalos/api/message/json" _ "github.com/eosswedenorg/thalos/api/message/msgpack" api_redis "github.com/eosswedenorg/thalos/api/redis" - "github.com/eosswedenorg/thalos/app" - "github.com/eosswedenorg/thalos/app/abi" - . "github.com/eosswedenorg/thalos/app/cache" - "github.com/eosswedenorg/thalos/app/config" - driver "github.com/eosswedenorg/thalos/app/driver/redis" - . "github.com/eosswedenorg/thalos/app/log" + "github.com/eosswedenorg/thalos/internal/abi" + "github.com/eosswedenorg/thalos/internal/cache" + . "github.com/eosswedenorg/thalos/internal/cache" + "github.com/eosswedenorg/thalos/internal/config" + driver "github.com/eosswedenorg/thalos/internal/driver/redis" + . "github.com/eosswedenorg/thalos/internal/log" + . "github.com/eosswedenorg/thalos/internal/server" redis_cache "github.com/go-redis/cache/v9" "github.com/nikoksr/notify" "github.com/nikoksr/notify/service/telegram" @@ -33,23 +34,7 @@ import ( "github.com/spf13/pflag" ) -// --------------------------- -// Global variables -// --------------------------- - -var conf config.Config - -var shClient *shipclient.Stream - -var running bool = true - -var exit chan bool - -var cache *Cache - -var cacheStore Store - -func readerLoop(processor *app.ShipProcessor) { +func readerLoop(conf *config.Config, running *bool, shClient *shipclient.Stream, processor *ShipProcessor) { recon_cnt := 0 exp := &backoff.ExponentialBackOff{ @@ -90,7 +75,7 @@ func readerLoop(processor *app.ShipProcessor) { return shClient.SendBlocksRequest() } - for running { + for *running { err := backoff.RetryNotify(connectOp, exp, func(err error, d time.Duration) { if recon_cnt >= 3 { @@ -110,8 +95,7 @@ func readerLoop(processor *app.ShipProcessor) { }) if err != nil { log.WithError(err).Error("Failed to connect to SHIP") - running = false - continue + return } recon_cnt = 0 @@ -123,9 +107,8 @@ func readerLoop(processor *app.ShipProcessor) { if err := shClient.Run(); err != nil { if errors.Is(err, shipclient.ErrEndBlockReached) { - exit <- true log.Info("Endblock reached.") - break + return } log.WithError(err).Error("Failed to read from ship") @@ -133,9 +116,11 @@ func readerLoop(processor *app.ShipProcessor) { } } -func run(processor *app.ShipProcessor) { +func run(conf *config.Config, shClient *shipclient.Stream, processor *ShipProcessor) { + running := true + // Spawn reader loop in another thread. - go readerLoop(processor) + go readerLoop(conf, &running, shClient, processor) // Create interrupt channel. signals := make(chan os.Signal, 1) @@ -144,27 +129,15 @@ func run(processor *app.ShipProcessor) { signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) // Wait for interrupt - select { - case sig := <-signals: - log.WithField("signal", sig).Info("Signal received") - - // Cleanly close the connection by sending a close message. - err := shClient.Shutdown() - if err != nil { - log.WithError(err).Info("failed to send close message to ship server") - } - case <-exit: - // Do nothing, just exit. - } + sig := <-signals + log.WithField("signal", sig).Info("Signal received") + // Cleanly close the connection by sending a close message. running = false -} - -func getChain(def string) string { - if len(conf.Ship.Chain) > 0 { - return conf.Ship.Chain + err := shClient.Shutdown() + if err != nil { + log.WithError(err).Info("failed to send close message to ship server") } - return def } func LogLevels() []string { @@ -175,13 +148,13 @@ func LogLevels() []string { return list } -func initAbiManger(api *eos.API, chain_id string) *abi.AbiManager { - cache := NewCache("thalos::cache::abi::"+chain_id, cacheStore) +func initAbiManager(api *eos.API, store cache.Store, chain_id string) *abi.AbiManager { + cache := NewCache("thalos::cache::abi::"+chain_id, store) return abi.NewAbiManager(cache, api) } -func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) app.StateLoader { - return func(state *app.State) { +func stateLoader(conf config.Config, chainInfo *eos.InfoResp, cache *cache.Cache, current_block_no_cache bool) StateLoader { + return func(state *State) { var source string // Load state from cache. @@ -217,10 +190,12 @@ func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) app.State } } -func stateSaver(state app.State) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) - defer cancel() - return cache.Set(ctx, "state", state, 0) +func stateSaver(cache *cache.Cache) StateSaver { + return func(state State) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + defer cancel() + return cache.Set(ctx, "state", state, 0) + } } func ReadConfig(cfg *config.Config, flags *pflag.FlagSet) error { @@ -246,8 +221,6 @@ func serverCmd(cmd *cobra.Command, args []string) { var err error var chainInfo *eos.InfoResp - exit = make(chan bool) - skip_currentblock_cache, _ := cmd.Flags().GetBool("no-state-cache") // Write PID file @@ -261,7 +234,7 @@ func serverCmd(cmd *cobra.Command, args []string) { } // Parse config - conf = config.New() + conf := config.New() if err = ReadConfig(&conf, cmd.Flags()); err != nil { log.WithError(err).Fatal("Failed to read config") return @@ -333,14 +306,14 @@ func serverCmd(cmd *cobra.Command, args []string) { } // Setup cache storage - cacheStore = NewRedisStore(&redis_cache.Options{ + cacheStore := NewRedisStore(&redis_cache.Options{ Redis: rdb, // Cache 10k keys for 10 minutes. LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute), }) // Setup general cache - cache = NewCache("thalos::cache::instance::"+conf.Name, cacheStore) + cache := NewCache("thalos::cache::instance::"+conf.Name, cacheStore) log.WithField("api", conf.Api).Info("Get chain info from api") eosClient := eos.New(conf.Api) @@ -350,7 +323,7 @@ func serverCmd(cmd *cobra.Command, args []string) { return } - shClient = shipclient.NewStream(func(s *shipclient.Stream) { + shClient := shipclient.NewStream(func(s *shipclient.Stream) { s.StartBlock = conf.Ship.StartBlockNum s.EndBlock = conf.Ship.EndBlockNum s.IrreversibleOnly = conf.Ship.IrreversibleOnly @@ -363,22 +336,25 @@ func serverCmd(cmd *cobra.Command, args []string) { return } - chain_id := getChain(chainInfo.ChainID.String()) + chain_id := conf.Ship.Chain + if len(chain_id) < 1 { + chain_id = chainInfo.ChainID.String() + } - processor := app.SpawnProccessor( + processor := SpawnProccessor( shClient, - stateLoader(chainInfo, skip_currentblock_cache), - stateSaver, + stateLoader(conf, chainInfo, cache, skip_currentblock_cache), + stateSaver(cache), driver.NewPublisher(context.Background(), rdb, api_redis.Namespace{ Prefix: conf.Redis.Prefix, ChainID: chain_id, }), - initAbiManger(eosClient, chain_id), + initAbiManager(eosClient, cacheStore, chain_id), codec, ) // Run the application - run(processor) + run(&conf, shClient, processor) // Close the processor properly processor.Close() diff --git a/cmd/tools/main.go b/cmd/tools/main.go index aac073e..ec477d0 100644 --- a/cmd/tools/main.go +++ b/cmd/tools/main.go @@ -3,7 +3,7 @@ package main import ( "time" - _ "github.com/eosswedenorg/thalos/app/log" + _ "github.com/eosswedenorg/thalos/internal/log" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/pflag" diff --git a/cmd/tools/mock_publisher.go b/cmd/tools/mock_publisher.go index a45b280..ec38871 100644 --- a/cmd/tools/mock_publisher.go +++ b/cmd/tools/mock_publisher.go @@ -10,7 +10,7 @@ import ( "github.com/eosswedenorg/thalos/api/message" _ "github.com/eosswedenorg/thalos/api/message/json" api_redis "github.com/eosswedenorg/thalos/api/redis" - redis_driver "github.com/eosswedenorg/thalos/app/driver/redis" + redis_driver "github.com/eosswedenorg/thalos/internal/driver/redis" "github.com/redis/go-redis/v9" log "github.com/sirupsen/logrus" diff --git a/app/abi/manager.go b/internal/abi/manager.go similarity index 96% rename from app/abi/manager.go rename to internal/abi/manager.go index 43be262..2441e01 100644 --- a/app/abi/manager.go +++ b/internal/abi/manager.go @@ -6,7 +6,7 @@ import ( "time" eos "github.com/eoscanada/eos-go" - "github.com/eosswedenorg/thalos/app/cache" + "github.com/eosswedenorg/thalos/internal/cache" ) // AbiManager handles an ABI cache that fetches the ABI from an API on cache miss. diff --git a/app/abi/manager_test.go b/internal/abi/manager_test.go similarity index 99% rename from app/abi/manager_test.go rename to internal/abi/manager_test.go index 661c8ec..7ecfc75 100644 --- a/app/abi/manager_test.go +++ b/internal/abi/manager_test.go @@ -9,7 +9,7 @@ import ( eos "github.com/eoscanada/eos-go" - "github.com/eosswedenorg/thalos/app/cache" + "github.com/eosswedenorg/thalos/internal/cache" "github.com/stretchr/testify/assert" ) diff --git a/app/cache/cache.go b/internal/cache/cache.go similarity index 100% rename from app/cache/cache.go rename to internal/cache/cache.go diff --git a/app/cache/memory_store.go b/internal/cache/memory_store.go similarity index 100% rename from app/cache/memory_store.go rename to internal/cache/memory_store.go diff --git a/app/cache/memory_store_test.go b/internal/cache/memory_store_test.go similarity index 100% rename from app/cache/memory_store_test.go rename to internal/cache/memory_store_test.go diff --git a/app/cache/redis_store.go b/internal/cache/redis_store.go similarity index 100% rename from app/cache/redis_store.go rename to internal/cache/redis_store.go diff --git a/app/cache/redis_store_test.go b/internal/cache/redis_store_test.go similarity index 100% rename from app/cache/redis_store_test.go rename to internal/cache/redis_store_test.go diff --git a/app/cache/store.go b/internal/cache/store.go similarity index 100% rename from app/cache/store.go rename to internal/cache/store.go diff --git a/app/config/cli.go b/internal/config/cli.go similarity index 100% rename from app/config/cli.go rename to internal/config/cli.go diff --git a/app/config/config.go b/internal/config/config.go similarity index 98% rename from app/config/config.go rename to internal/config/config.go index 1ec5591..e137352 100644 --- a/app/config/config.go +++ b/internal/config/config.go @@ -4,7 +4,7 @@ import ( "reflect" "time" - "github.com/eosswedenorg/thalos/app/log" + "github.com/eosswedenorg/thalos/internal/log" shipclient "github.com/eosswedenorg-go/antelope-ship-client" ) diff --git a/app/config/config_test.go b/internal/config/config_test.go similarity index 98% rename from app/config/config_test.go rename to internal/config/config_test.go index e78f93b..ceba4e2 100644 --- a/app/config/config_test.go +++ b/internal/config/config_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/eosswedenorg/thalos/app/log" + "github.com/eosswedenorg/thalos/internal/log" "github.com/stretchr/testify/require" shipclient "github.com/eosswedenorg-go/antelope-ship-client" diff --git a/app/config/file.go b/internal/config/file.go similarity index 100% rename from app/config/file.go rename to internal/config/file.go diff --git a/app/driver/redis/publisher.go b/internal/driver/redis/publisher.go similarity index 100% rename from app/driver/redis/publisher.go rename to internal/driver/redis/publisher.go diff --git a/app/driver/redis/publisher_test.go b/internal/driver/redis/publisher_test.go similarity index 100% rename from app/driver/redis/publisher_test.go rename to internal/driver/redis/publisher_test.go diff --git a/app/driver/writer.go b/internal/driver/writer.go similarity index 100% rename from app/driver/writer.go rename to internal/driver/writer.go diff --git a/app/log/HookWriter.go b/internal/log/HookWriter.go similarity index 100% rename from app/log/HookWriter.go rename to internal/log/HookWriter.go diff --git a/app/log/RotatingFile.go b/internal/log/RotatingFile.go similarity index 100% rename from app/log/RotatingFile.go rename to internal/log/RotatingFile.go diff --git a/app/log/config.go b/internal/log/config.go similarity index 94% rename from app/log/config.go rename to internal/log/config.go index 7179bdb..60c5715 100644 --- a/app/log/config.go +++ b/internal/log/config.go @@ -4,7 +4,7 @@ import ( "path" "time" - "github.com/eosswedenorg/thalos/app/types" + "github.com/eosswedenorg/thalos/internal/types" ) // Config represents configuration parameters for a log. diff --git a/app/log/config_test.go b/internal/log/config_test.go similarity index 100% rename from app/log/config_test.go rename to internal/log/config_test.go diff --git a/app/log/init.go b/internal/log/init.go similarity index 100% rename from app/log/init.go rename to internal/log/init.go diff --git a/app/ship_processor.go b/internal/server/ship_processor.go similarity index 98% rename from app/ship_processor.go rename to internal/server/ship_processor.go index b750cb5..d510c99 100644 --- a/app/ship_processor.go +++ b/internal/server/ship_processor.go @@ -1,4 +1,4 @@ -package app +package server import ( "encoding/hex" @@ -6,8 +6,8 @@ import ( "github.com/eosswedenorg/thalos/api" "github.com/eosswedenorg/thalos/api/message" - "github.com/eosswedenorg/thalos/app/abi" - "github.com/eosswedenorg/thalos/app/driver" + "github.com/eosswedenorg/thalos/internal/abi" + "github.com/eosswedenorg/thalos/internal/driver" log "github.com/sirupsen/logrus" diff --git a/app/state.go b/internal/server/state.go similarity index 94% rename from app/state.go rename to internal/server/state.go index 2edb323..d8312ef 100644 --- a/app/state.go +++ b/internal/server/state.go @@ -1,4 +1,4 @@ -package app +package server // State represents thalos runtime state type State struct { diff --git a/app/types/size.go b/internal/types/size.go similarity index 100% rename from app/types/size.go rename to internal/types/size.go diff --git a/app/types/size_test.go b/internal/types/size_test.go similarity index 100% rename from app/types/size_test.go rename to internal/types/size_test.go