diff --git a/cmd/thalos/main.go b/cmd/thalos/main.go index d590c02..df9c814 100644 --- a/cmd/thalos/main.go +++ b/cmd/thalos/main.go @@ -182,6 +182,44 @@ func initAbiManger(api *eos.API, chain_id string) *abi.AbiManager { return abi.NewAbiManager(cache, api) } +func stateLoader(chainInfo *eos.InfoResp) app.StateLoader { + return func(state *app.State) { + var source string + + // Load state from cache. + err := cache.Get("state", &state) + + // on error (cache miss) set current block from config/api + if err != nil { + // Set from config if we have a sane value. + if conf.Ship.StartBlockNum != shipclient.NULL_BLOCK_NUMBER { + source = "config" + state.CurrentBlock = conf.Ship.StartBlockNum + } else { + // Otherwise, set from api. + if conf.Ship.IrreversibleOnly { + source = "api (LIB)" + state.CurrentBlock = uint32(chainInfo.LastIrreversibleBlockNum) + } else { + source = "api (HEAD)" + state.CurrentBlock = uint32(chainInfo.HeadBlockNum) + } + } + } else { + source = "cache" + } + + log.WithFields(log.Fields{ + "block": state.CurrentBlock, + "source": source, + }).Info("Starting from block") + } +} + +func stateSaver(state app.State) error { + return cache.Set("state", state, 0) +} + func main() { var err error var chainInfo *eos.InfoResp @@ -310,14 +348,6 @@ func main() { return } - if conf.Ship.StartBlockNum == shipclient.NULL_BLOCK_NUMBER { - if conf.Ship.IrreversibleOnly { - conf.Ship.StartBlockNum = uint32(chainInfo.LastIrreversibleBlockNum) - } else { - conf.Ship.StartBlockNum = uint32(chainInfo.HeadBlockNum) - } - } - shClient = shipclient.NewStream(func(s *shipclient.Stream) { s.StartBlock = conf.Ship.StartBlockNum s.EndBlock = conf.Ship.EndBlockNum @@ -335,6 +365,8 @@ func main() { processor := app.SpawnProccessor( shClient, + stateLoader(chainInfo), + stateSaver, api_redis.NewPublisher(context.Background(), rdb, api_redis.Namespace{ Prefix: conf.Redis.Prefix, ChainID: chain_id,