1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-07-03 11:53:41 +02:00

Merge branch 'refactor'

This commit is contained in:
Henrik Hautakoski 2024-02-17 17:32:19 +01:00
commit 85da219349
27 changed files with 55 additions and 79 deletions

View file

@ -18,12 +18,13 @@ import (
_ "github.com/eosswedenorg/thalos/api/message/json" _ "github.com/eosswedenorg/thalos/api/message/json"
_ "github.com/eosswedenorg/thalos/api/message/msgpack" _ "github.com/eosswedenorg/thalos/api/message/msgpack"
api_redis "github.com/eosswedenorg/thalos/api/redis" api_redis "github.com/eosswedenorg/thalos/api/redis"
"github.com/eosswedenorg/thalos/app" "github.com/eosswedenorg/thalos/internal/abi"
"github.com/eosswedenorg/thalos/app/abi" "github.com/eosswedenorg/thalos/internal/cache"
. "github.com/eosswedenorg/thalos/app/cache" . "github.com/eosswedenorg/thalos/internal/cache"
"github.com/eosswedenorg/thalos/app/config" "github.com/eosswedenorg/thalos/internal/config"
driver "github.com/eosswedenorg/thalos/app/driver/redis" driver "github.com/eosswedenorg/thalos/internal/driver/redis"
. "github.com/eosswedenorg/thalos/app/log" . "github.com/eosswedenorg/thalos/internal/log"
. "github.com/eosswedenorg/thalos/internal/server"
redis_cache "github.com/go-redis/cache/v9" redis_cache "github.com/go-redis/cache/v9"
"github.com/nikoksr/notify" "github.com/nikoksr/notify"
"github.com/nikoksr/notify/service/telegram" "github.com/nikoksr/notify/service/telegram"
@ -33,23 +34,7 @@ import (
"github.com/spf13/pflag" "github.com/spf13/pflag"
) )
// --------------------------- func readerLoop(conf *config.Config, running *bool, shClient *shipclient.Stream, processor *ShipProcessor) {
// Global variables
// ---------------------------
var conf config.Config
var shClient *shipclient.Stream
var running bool = true
var exit chan bool
var cache *Cache
var cacheStore Store
func readerLoop(processor *app.ShipProcessor) {
recon_cnt := 0 recon_cnt := 0
exp := &backoff.ExponentialBackOff{ exp := &backoff.ExponentialBackOff{
@ -90,7 +75,7 @@ func readerLoop(processor *app.ShipProcessor) {
return shClient.SendBlocksRequest() return shClient.SendBlocksRequest()
} }
for running { for *running {
err := backoff.RetryNotify(connectOp, exp, func(err error, d time.Duration) { err := backoff.RetryNotify(connectOp, exp, func(err error, d time.Duration) {
if recon_cnt >= 3 { if recon_cnt >= 3 {
@ -110,8 +95,7 @@ func readerLoop(processor *app.ShipProcessor) {
}) })
if err != nil { if err != nil {
log.WithError(err).Error("Failed to connect to SHIP") log.WithError(err).Error("Failed to connect to SHIP")
running = false return
continue
} }
recon_cnt = 0 recon_cnt = 0
@ -123,9 +107,8 @@ func readerLoop(processor *app.ShipProcessor) {
if err := shClient.Run(); err != nil { if err := shClient.Run(); err != nil {
if errors.Is(err, shipclient.ErrEndBlockReached) { if errors.Is(err, shipclient.ErrEndBlockReached) {
exit <- true
log.Info("Endblock reached.") log.Info("Endblock reached.")
break return
} }
log.WithError(err).Error("Failed to read from ship") log.WithError(err).Error("Failed to read from ship")
@ -133,9 +116,11 @@ func readerLoop(processor *app.ShipProcessor) {
} }
} }
func run(processor *app.ShipProcessor) { func run(conf *config.Config, shClient *shipclient.Stream, processor *ShipProcessor) {
running := true
// Spawn reader loop in another thread. // Spawn reader loop in another thread.
go readerLoop(processor) go readerLoop(conf, &running, shClient, processor)
// Create interrupt channel. // Create interrupt channel.
signals := make(chan os.Signal, 1) signals := make(chan os.Signal, 1)
@ -144,27 +129,15 @@ func run(processor *app.ShipProcessor) {
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
// Wait for interrupt // Wait for interrupt
select { sig := <-signals
case sig := <-signals:
log.WithField("signal", sig).Info("Signal received") log.WithField("signal", sig).Info("Signal received")
// Cleanly close the connection by sending a close message. // Cleanly close the connection by sending a close message.
running = false
err := shClient.Shutdown() err := shClient.Shutdown()
if err != nil { if err != nil {
log.WithError(err).Info("failed to send close message to ship server") log.WithError(err).Info("failed to send close message to ship server")
} }
case <-exit:
// Do nothing, just exit.
}
running = false
}
func getChain(def string) string {
if len(conf.Ship.Chain) > 0 {
return conf.Ship.Chain
}
return def
} }
func LogLevels() []string { func LogLevels() []string {
@ -175,13 +148,13 @@ func LogLevels() []string {
return list return list
} }
func initAbiManger(api *eos.API, chain_id string) *abi.AbiManager { func initAbiManager(api *eos.API, store cache.Store, chain_id string) *abi.AbiManager {
cache := NewCache("thalos::cache::abi::"+chain_id, cacheStore) cache := NewCache("thalos::cache::abi::"+chain_id, store)
return abi.NewAbiManager(cache, api) return abi.NewAbiManager(cache, api)
} }
func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) app.StateLoader { func stateLoader(conf config.Config, chainInfo *eos.InfoResp, cache *cache.Cache, current_block_no_cache bool) StateLoader {
return func(state *app.State) { return func(state *State) {
var source string var source string
// Load state from cache. // Load state from cache.
@ -217,11 +190,13 @@ func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) app.State
} }
} }
func stateSaver(state app.State) error { func stateSaver(cache *cache.Cache) StateSaver {
return func(state State) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel() defer cancel()
return cache.Set(ctx, "state", state, 0) return cache.Set(ctx, "state", state, 0)
} }
}
func ReadConfig(cfg *config.Config, flags *pflag.FlagSet) error { func ReadConfig(cfg *config.Config, flags *pflag.FlagSet) error {
filename, err := flags.GetString("config") filename, err := flags.GetString("config")
@ -246,8 +221,6 @@ func serverCmd(cmd *cobra.Command, args []string) {
var err error var err error
var chainInfo *eos.InfoResp var chainInfo *eos.InfoResp
exit = make(chan bool)
skip_currentblock_cache, _ := cmd.Flags().GetBool("no-state-cache") skip_currentblock_cache, _ := cmd.Flags().GetBool("no-state-cache")
// Write PID file // Write PID file
@ -261,7 +234,7 @@ func serverCmd(cmd *cobra.Command, args []string) {
} }
// Parse config // Parse config
conf = config.New() conf := config.New()
if err = ReadConfig(&conf, cmd.Flags()); err != nil { if err = ReadConfig(&conf, cmd.Flags()); err != nil {
log.WithError(err).Fatal("Failed to read config") log.WithError(err).Fatal("Failed to read config")
return return
@ -333,14 +306,14 @@ func serverCmd(cmd *cobra.Command, args []string) {
} }
// Setup cache storage // Setup cache storage
cacheStore = NewRedisStore(&redis_cache.Options{ cacheStore := NewRedisStore(&redis_cache.Options{
Redis: rdb, Redis: rdb,
// Cache 10k keys for 10 minutes. // Cache 10k keys for 10 minutes.
LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute), LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute),
}) })
// Setup general cache // Setup general cache
cache = NewCache("thalos::cache::instance::"+conf.Name, cacheStore) cache := NewCache("thalos::cache::instance::"+conf.Name, cacheStore)
log.WithField("api", conf.Api).Info("Get chain info from api") log.WithField("api", conf.Api).Info("Get chain info from api")
eosClient := eos.New(conf.Api) eosClient := eos.New(conf.Api)
@ -350,7 +323,7 @@ func serverCmd(cmd *cobra.Command, args []string) {
return return
} }
shClient = shipclient.NewStream(func(s *shipclient.Stream) { shClient := shipclient.NewStream(func(s *shipclient.Stream) {
s.StartBlock = conf.Ship.StartBlockNum s.StartBlock = conf.Ship.StartBlockNum
s.EndBlock = conf.Ship.EndBlockNum s.EndBlock = conf.Ship.EndBlockNum
s.IrreversibleOnly = conf.Ship.IrreversibleOnly s.IrreversibleOnly = conf.Ship.IrreversibleOnly
@ -363,22 +336,25 @@ func serverCmd(cmd *cobra.Command, args []string) {
return return
} }
chain_id := getChain(chainInfo.ChainID.String()) chain_id := conf.Ship.Chain
if len(chain_id) < 1 {
chain_id = chainInfo.ChainID.String()
}
processor := app.SpawnProccessor( processor := SpawnProccessor(
shClient, shClient,
stateLoader(chainInfo, skip_currentblock_cache), stateLoader(conf, chainInfo, cache, skip_currentblock_cache),
stateSaver, stateSaver(cache),
driver.NewPublisher(context.Background(), rdb, api_redis.Namespace{ driver.NewPublisher(context.Background(), rdb, api_redis.Namespace{
Prefix: conf.Redis.Prefix, Prefix: conf.Redis.Prefix,
ChainID: chain_id, ChainID: chain_id,
}), }),
initAbiManger(eosClient, chain_id), initAbiManager(eosClient, cacheStore, chain_id),
codec, codec,
) )
// Run the application // Run the application
run(processor) run(&conf, shClient, processor)
// Close the processor properly // Close the processor properly
processor.Close() processor.Close()

View file

@ -3,7 +3,7 @@ package main
import ( import (
"time" "time"
_ "github.com/eosswedenorg/thalos/app/log" _ "github.com/eosswedenorg/thalos/internal/log"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/pflag" "github.com/spf13/pflag"

View file

@ -10,7 +10,7 @@ import (
"github.com/eosswedenorg/thalos/api/message" "github.com/eosswedenorg/thalos/api/message"
_ "github.com/eosswedenorg/thalos/api/message/json" _ "github.com/eosswedenorg/thalos/api/message/json"
api_redis "github.com/eosswedenorg/thalos/api/redis" api_redis "github.com/eosswedenorg/thalos/api/redis"
redis_driver "github.com/eosswedenorg/thalos/app/driver/redis" redis_driver "github.com/eosswedenorg/thalos/internal/driver/redis"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"

View file

@ -6,7 +6,7 @@ import (
"time" "time"
eos "github.com/eoscanada/eos-go" eos "github.com/eoscanada/eos-go"
"github.com/eosswedenorg/thalos/app/cache" "github.com/eosswedenorg/thalos/internal/cache"
) )
// AbiManager handles an ABI cache that fetches the ABI from an API on cache miss. // AbiManager handles an ABI cache that fetches the ABI from an API on cache miss.

View file

@ -9,7 +9,7 @@ import (
eos "github.com/eoscanada/eos-go" eos "github.com/eoscanada/eos-go"
"github.com/eosswedenorg/thalos/app/cache" "github.com/eosswedenorg/thalos/internal/cache"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )

View file

@ -4,7 +4,7 @@ import (
"reflect" "reflect"
"time" "time"
"github.com/eosswedenorg/thalos/app/log" "github.com/eosswedenorg/thalos/internal/log"
shipclient "github.com/eosswedenorg-go/antelope-ship-client" shipclient "github.com/eosswedenorg-go/antelope-ship-client"
) )

View file

@ -4,7 +4,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/eosswedenorg/thalos/app/log" "github.com/eosswedenorg/thalos/internal/log"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
shipclient "github.com/eosswedenorg-go/antelope-ship-client" shipclient "github.com/eosswedenorg-go/antelope-ship-client"

View file

@ -4,7 +4,7 @@ import (
"path" "path"
"time" "time"
"github.com/eosswedenorg/thalos/app/types" "github.com/eosswedenorg/thalos/internal/types"
) )
// Config represents configuration parameters for a log. // Config represents configuration parameters for a log.

View file

@ -1,4 +1,4 @@
package app package server
import ( import (
"encoding/hex" "encoding/hex"
@ -6,8 +6,8 @@ import (
"github.com/eosswedenorg/thalos/api" "github.com/eosswedenorg/thalos/api"
"github.com/eosswedenorg/thalos/api/message" "github.com/eosswedenorg/thalos/api/message"
"github.com/eosswedenorg/thalos/app/abi" "github.com/eosswedenorg/thalos/internal/abi"
"github.com/eosswedenorg/thalos/app/driver" "github.com/eosswedenorg/thalos/internal/driver"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"

View file

@ -1,4 +1,4 @@
package app package server
// State represents thalos runtime state // State represents thalos runtime state
type State struct { type State struct {