diff --git a/cmd/thalos/server.go b/cmd/thalos/server.go index 7be4b29..a642420 100644 --- a/cmd/thalos/server.go +++ b/cmd/thalos/server.go @@ -19,6 +19,7 @@ import ( _ "github.com/eosswedenorg/thalos/api/message/msgpack" api_redis "github.com/eosswedenorg/thalos/api/redis" "github.com/eosswedenorg/thalos/internal/abi" + "github.com/eosswedenorg/thalos/internal/cache" . "github.com/eosswedenorg/thalos/internal/cache" "github.com/eosswedenorg/thalos/internal/config" driver "github.com/eosswedenorg/thalos/internal/driver/redis" @@ -32,21 +33,7 @@ import ( "github.com/urfave/cli/v2" ) -// --------------------------- -// Global variables -// --------------------------- - -var conf config.Config - -var shClient *shipclient.Stream - -var running bool = true - -var cache *Cache - -var cacheStore Store - -func readerLoop(processor *ShipProcessor) { +func readerLoop(conf *config.Config, running *bool, shClient *shipclient.Stream, processor *ShipProcessor) { recon_cnt := 0 exp := &backoff.ExponentialBackOff{ @@ -87,7 +74,7 @@ func readerLoop(processor *ShipProcessor) { return shClient.SendBlocksRequest() } - for running { + for *running { err := backoff.RetryNotify(connectOp, exp, func(err error, d time.Duration) { if recon_cnt >= 3 { @@ -128,9 +115,11 @@ func readerLoop(processor *ShipProcessor) { } } -func run(processor *ShipProcessor) { +func run(conf *config.Config, shClient *shipclient.Stream, processor *ShipProcessor) { + running := true + // Spawn reader loop in another thread. - go readerLoop(processor) + go readerLoop(conf, &running, shClient, processor) // Create interrupt channel. signals := make(chan os.Signal, 1) @@ -150,13 +139,6 @@ func run(processor *ShipProcessor) { } } -func getChain(def string) string { - if len(conf.Ship.Chain) > 0 { - return conf.Ship.Chain - } - return def -} - func LogLevels() []string { list := []string{} for _, lvl := range log.AllLevels { @@ -165,12 +147,12 @@ func LogLevels() []string { return list } -func initAbiManger(api *eos.API, chain_id string) *abi.AbiManager { - cache := NewCache("thalos::cache::abi::"+chain_id, cacheStore) +func initAbiManager(api *eos.API, store cache.Store, chain_id string) *abi.AbiManager { + cache := NewCache("thalos::cache::abi::"+chain_id, store) return abi.NewAbiManager(cache, api) } -func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) StateLoader { +func stateLoader(conf config.Config, chainInfo *eos.InfoResp, cache *cache.Cache, current_block_no_cache bool) StateLoader { return func(state *State) { var source string @@ -207,10 +189,12 @@ func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) StateLoad } } -func stateSaver(state State) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) - defer cancel() - return cache.Set(ctx, "state", state, 0) +func stateSaver(cache *cache.Cache) StateSaver { + return func(state State) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + defer cancel() + return cache.Set(ctx, "state", state, 0) + } } func ReadConfig(cfg *config.Config, ctx *cli.Context) error { @@ -243,7 +227,7 @@ func serverCmd(ctx *cli.Context) error { } // Parse config - conf = config.New() + conf := config.New() if err = ReadConfig(&conf, ctx); err != nil { return fmt.Errorf("config: %s", err) } @@ -307,14 +291,14 @@ func serverCmd(ctx *cli.Context) error { } // Setup cache storage - cacheStore = NewRedisStore(&redis_cache.Options{ + cacheStore := NewRedisStore(&redis_cache.Options{ Redis: rdb, // Cache 10k keys for 10 minutes. LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute), }) // Setup general cache - cache = NewCache("thalos::cache::instance::"+conf.Name, cacheStore) + cache := NewCache("thalos::cache::instance::"+conf.Name, cacheStore) log.WithField("api", conf.Api).Info("Get chain info from api") eosClient := eos.New(conf.Api) @@ -323,7 +307,7 @@ func serverCmd(ctx *cli.Context) error { return fmt.Errorf("eosapi: %s", err) } - shClient = shipclient.NewStream(func(s *shipclient.Stream) { + shClient := shipclient.NewStream(func(s *shipclient.Stream) { s.StartBlock = conf.Ship.StartBlockNum s.EndBlock = conf.Ship.EndBlockNum s.IrreversibleOnly = conf.Ship.IrreversibleOnly @@ -335,22 +319,25 @@ func serverCmd(ctx *cli.Context) error { return fmt.Errorf("codec: %s", err) } - chain_id := getChain(chainInfo.ChainID.String()) + chain_id := conf.Ship.Chain + if len(chain_id) < 1 { + chain_id = chainInfo.ChainID.String() + } processor := SpawnProccessor( shClient, - stateLoader(chainInfo, skip_currentblock_cache), - stateSaver, + stateLoader(conf, chainInfo, cache, skip_currentblock_cache), + stateSaver(cache), driver.NewPublisher(context.Background(), rdb, api_redis.Namespace{ Prefix: conf.Redis.Prefix, ChainID: chain_id, }), - initAbiManger(eosClient, chain_id), + initAbiManager(eosClient, cacheStore, chain_id), codec, ) // Run the application - run(processor) + run(&conf, shClient, processor) // Close the processor properly processor.Close()