mirror of
https://github.com/eosswedenorg/thalos
synced 2026-06-16 04:24:56 +02:00
cmd/thalos/server.go: remove global variables.
This commit is contained in:
parent
7108299550
commit
b853bc026e
1 changed files with 28 additions and 41 deletions
|
|
@ -19,6 +19,7 @@ import (
|
|||
_ "github.com/eosswedenorg/thalos/api/message/msgpack"
|
||||
api_redis "github.com/eosswedenorg/thalos/api/redis"
|
||||
"github.com/eosswedenorg/thalos/internal/abi"
|
||||
"github.com/eosswedenorg/thalos/internal/cache"
|
||||
. "github.com/eosswedenorg/thalos/internal/cache"
|
||||
"github.com/eosswedenorg/thalos/internal/config"
|
||||
driver "github.com/eosswedenorg/thalos/internal/driver/redis"
|
||||
|
|
@ -32,21 +33,7 @@ import (
|
|||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
// ---------------------------
|
||||
// Global variables
|
||||
// ---------------------------
|
||||
|
||||
var conf config.Config
|
||||
|
||||
var shClient *shipclient.Stream
|
||||
|
||||
var running bool = true
|
||||
|
||||
var cache *Cache
|
||||
|
||||
var cacheStore Store
|
||||
|
||||
func readerLoop(processor *ShipProcessor) {
|
||||
func readerLoop(conf *config.Config, running *bool, shClient *shipclient.Stream, processor *ShipProcessor) {
|
||||
recon_cnt := 0
|
||||
|
||||
exp := &backoff.ExponentialBackOff{
|
||||
|
|
@ -87,7 +74,7 @@ func readerLoop(processor *ShipProcessor) {
|
|||
return shClient.SendBlocksRequest()
|
||||
}
|
||||
|
||||
for running {
|
||||
for *running {
|
||||
|
||||
err := backoff.RetryNotify(connectOp, exp, func(err error, d time.Duration) {
|
||||
if recon_cnt >= 3 {
|
||||
|
|
@ -128,9 +115,11 @@ func readerLoop(processor *ShipProcessor) {
|
|||
}
|
||||
}
|
||||
|
||||
func run(processor *ShipProcessor) {
|
||||
func run(conf *config.Config, shClient *shipclient.Stream, processor *ShipProcessor) {
|
||||
running := true
|
||||
|
||||
// Spawn reader loop in another thread.
|
||||
go readerLoop(processor)
|
||||
go readerLoop(conf, &running, shClient, processor)
|
||||
|
||||
// Create interrupt channel.
|
||||
signals := make(chan os.Signal, 1)
|
||||
|
|
@ -150,13 +139,6 @@ func run(processor *ShipProcessor) {
|
|||
}
|
||||
}
|
||||
|
||||
func getChain(def string) string {
|
||||
if len(conf.Ship.Chain) > 0 {
|
||||
return conf.Ship.Chain
|
||||
}
|
||||
return def
|
||||
}
|
||||
|
||||
func LogLevels() []string {
|
||||
list := []string{}
|
||||
for _, lvl := range log.AllLevels {
|
||||
|
|
@ -165,12 +147,12 @@ func LogLevels() []string {
|
|||
return list
|
||||
}
|
||||
|
||||
func initAbiManger(api *eos.API, chain_id string) *abi.AbiManager {
|
||||
cache := NewCache("thalos::cache::abi::"+chain_id, cacheStore)
|
||||
func initAbiManager(api *eos.API, store cache.Store, chain_id string) *abi.AbiManager {
|
||||
cache := NewCache("thalos::cache::abi::"+chain_id, store)
|
||||
return abi.NewAbiManager(cache, api)
|
||||
}
|
||||
|
||||
func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) StateLoader {
|
||||
func stateLoader(conf config.Config, chainInfo *eos.InfoResp, cache *cache.Cache, current_block_no_cache bool) StateLoader {
|
||||
return func(state *State) {
|
||||
var source string
|
||||
|
||||
|
|
@ -207,10 +189,12 @@ func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) StateLoad
|
|||
}
|
||||
}
|
||||
|
||||
func stateSaver(state State) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
|
||||
defer cancel()
|
||||
return cache.Set(ctx, "state", state, 0)
|
||||
func stateSaver(cache *cache.Cache) StateSaver {
|
||||
return func(state State) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
|
||||
defer cancel()
|
||||
return cache.Set(ctx, "state", state, 0)
|
||||
}
|
||||
}
|
||||
|
||||
func ReadConfig(cfg *config.Config, ctx *cli.Context) error {
|
||||
|
|
@ -243,7 +227,7 @@ func serverCmd(ctx *cli.Context) error {
|
|||
}
|
||||
|
||||
// Parse config
|
||||
conf = config.New()
|
||||
conf := config.New()
|
||||
if err = ReadConfig(&conf, ctx); err != nil {
|
||||
return fmt.Errorf("config: %s", err)
|
||||
}
|
||||
|
|
@ -307,14 +291,14 @@ func serverCmd(ctx *cli.Context) error {
|
|||
}
|
||||
|
||||
// Setup cache storage
|
||||
cacheStore = NewRedisStore(&redis_cache.Options{
|
||||
cacheStore := NewRedisStore(&redis_cache.Options{
|
||||
Redis: rdb,
|
||||
// Cache 10k keys for 10 minutes.
|
||||
LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute),
|
||||
})
|
||||
|
||||
// Setup general cache
|
||||
cache = NewCache("thalos::cache::instance::"+conf.Name, cacheStore)
|
||||
cache := NewCache("thalos::cache::instance::"+conf.Name, cacheStore)
|
||||
|
||||
log.WithField("api", conf.Api).Info("Get chain info from api")
|
||||
eosClient := eos.New(conf.Api)
|
||||
|
|
@ -323,7 +307,7 @@ func serverCmd(ctx *cli.Context) error {
|
|||
return fmt.Errorf("eosapi: %s", err)
|
||||
}
|
||||
|
||||
shClient = shipclient.NewStream(func(s *shipclient.Stream) {
|
||||
shClient := shipclient.NewStream(func(s *shipclient.Stream) {
|
||||
s.StartBlock = conf.Ship.StartBlockNum
|
||||
s.EndBlock = conf.Ship.EndBlockNum
|
||||
s.IrreversibleOnly = conf.Ship.IrreversibleOnly
|
||||
|
|
@ -335,22 +319,25 @@ func serverCmd(ctx *cli.Context) error {
|
|||
return fmt.Errorf("codec: %s", err)
|
||||
}
|
||||
|
||||
chain_id := getChain(chainInfo.ChainID.String())
|
||||
chain_id := conf.Ship.Chain
|
||||
if len(chain_id) < 1 {
|
||||
chain_id = chainInfo.ChainID.String()
|
||||
}
|
||||
|
||||
processor := SpawnProccessor(
|
||||
shClient,
|
||||
stateLoader(chainInfo, skip_currentblock_cache),
|
||||
stateSaver,
|
||||
stateLoader(conf, chainInfo, cache, skip_currentblock_cache),
|
||||
stateSaver(cache),
|
||||
driver.NewPublisher(context.Background(), rdb, api_redis.Namespace{
|
||||
Prefix: conf.Redis.Prefix,
|
||||
ChainID: chain_id,
|
||||
}),
|
||||
initAbiManger(eosClient, chain_id),
|
||||
initAbiManager(eosClient, cacheStore, chain_id),
|
||||
codec,
|
||||
)
|
||||
|
||||
// Run the application
|
||||
run(processor)
|
||||
run(&conf, shClient, processor)
|
||||
|
||||
// Close the processor properly
|
||||
processor.Close()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue