1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-16 04:24:56 +02:00

Merge branch 'cache'

# Conflicts:
#	app/abi/manager.go
#	app/ship_processor.go
#	cmd/thalos/main.go
This commit is contained in:
Henrik Hautakoski 2023-12-06 15:38:34 +01:00
commit 22e98f1c37
13 changed files with 611 additions and 245 deletions

View file

@ -1,46 +0,0 @@
package abi
import (
"context"
"time"
eos "github.com/eoscanada/eos-go"
redis_cache "github.com/go-redis/cache/v9"
)
// Cache represents a abi cache in redis.
type Cache struct {
c *redis_cache.Cache
ctx context.Context
prefix string
}
// Create a new cache
func NewCache(prefix string, options *redis_cache.Options) *Cache {
return &Cache{
c: redis_cache.New(options),
ctx: context.Background(),
prefix: prefix,
}
}
// Get an ABI from the cache using the contract account name as the key.
func (cache *Cache) Get(account string) (*eos.ABI, error) {
var v eos.ABI
err := cache.c.Get(cache.ctx, cache.key(account), &v)
return &v, err
}
// Set an ABI in the cache.
func (cache *Cache) Set(account string, abi *eos.ABI, ttl time.Duration) error {
return cache.c.Set(&redis_cache.Item{
Ctx: cache.ctx,
Key: cache.key(account),
Value: *abi,
TTL: ttl,
})
}
func (cache *Cache) key(account string) string {
return cache.prefix + "::" + account
}

View file

@ -1,158 +0,0 @@
package abi
import (
"strings"
"testing"
"time"
eos "github.com/eoscanada/eos-go"
redis_cache "github.com/go-redis/cache/v9"
"github.com/go-redis/redismock/v9"
"github.com/stretchr/testify/assert"
)
var abiString = `
{
"version": "eosio::abi/1.0",
"types": [{
"new_type_name": "new_type_name_1",
"type": "name"
}],
"structs": [
{
"name": "struct_name_1",
"base": "struct_name_2",
"fields": [
{"name":"struct_1_field_1", "type":"new_type_name_1"},
{"name":"struct_1_field_2", "type":"struct_name_3"},
{"name":"struct_1_field_3", "type":"string?"},
{"name":"struct_1_field_4", "type":"string?"},
{"name":"struct_1_field_5", "type":"struct_name_4[]"}
]
},{
"name": "struct_name_2",
"base": "",
"fields": [
{"name":"struct_2_field_1", "type":"string"}
]
},{
"name": "struct_name_3",
"base": "",
"fields": [
{"name":"struct_3_field_1", "type":"string"}
]
},{
"name": "struct_name_4",
"base": "",
"fields": [
{"name":"struct_4_field_1", "type":"string"}
]
}
],
"actions": [{
"name": "action_name_1",
"type": "struct_name_1",
"ricardian_contract": ""
}],
"tables": [{
"name": "table_name_1",
"index_type": "i64",
"key_names": [
"key_name_1",
"key_name_2"
],
"key_types": [
"string",
"int"
],
"type": "struct_name_1"
}
]
}
`
func TestGetSet(t *testing.T) {
client, mock := redismock.NewClientMock()
c := NewCache("thalos::cache::test", &redis_cache.Options{
Redis: client,
// Cache 10k keys for 1 minute.
LocalCache: redis_cache.NewTinyLFU(10000, time.Minute),
})
abi, err := eos.NewABI(strings.NewReader(abiString))
assert.NoError(t, err)
bytes, _ := c.c.Marshal(*abi)
mock.ExpectSet("thalos::cache::test::testaccount", bytes, time.Minute).SetVal("OK")
err = c.Set("testaccount", abi, time.Minute)
assert.NoError(t, err)
c_abi, err := c.Get("testaccount")
assert.NoError(t, err)
assert.Equal(t, c_abi.Version, "eosio::abi/1.0")
// Types
assert.Equal(t, c_abi.Types[0].NewTypeName, "new_type_name_1")
assert.Equal(t, c_abi.Types[0].Type, "name")
// Structs
assert.Equal(t, c_abi.Structs[0].Name, "struct_name_1")
assert.Equal(t, c_abi.Structs[0].Base, "struct_name_2")
assert.Equal(t, c_abi.Structs[0].Fields[0].Name, "struct_1_field_1")
assert.Equal(t, c_abi.Structs[0].Fields[0].Type, "new_type_name_1")
assert.Equal(t, c_abi.Structs[0].Fields[1].Name, "struct_1_field_2")
assert.Equal(t, c_abi.Structs[0].Fields[1].Type, "struct_name_3")
assert.Equal(t, c_abi.Structs[0].Fields[2].Name, "struct_1_field_3")
assert.Equal(t, c_abi.Structs[0].Fields[2].Type, "string?")
assert.Equal(t, c_abi.Structs[0].Fields[3].Name, "struct_1_field_4")
assert.Equal(t, c_abi.Structs[0].Fields[3].Type, "string?")
assert.Equal(t, c_abi.Structs[0].Fields[4].Name, "struct_1_field_5")
assert.Equal(t, c_abi.Structs[0].Fields[4].Type, "struct_name_4[]")
assert.Equal(t, c_abi.Structs[1].Name, "struct_name_2")
assert.Equal(t, c_abi.Structs[1].Base, "")
assert.Equal(t, c_abi.Structs[1].Fields[0].Name, "struct_2_field_1")
assert.Equal(t, c_abi.Structs[1].Fields[0].Type, "string")
assert.Equal(t, c_abi.Structs[2].Name, "struct_name_3")
assert.Equal(t, c_abi.Structs[2].Base, "")
assert.Equal(t, c_abi.Structs[2].Fields[0].Name, "struct_3_field_1")
assert.Equal(t, c_abi.Structs[2].Fields[0].Type, "string")
assert.Equal(t, c_abi.Structs[3].Name, "struct_name_4")
assert.Equal(t, c_abi.Structs[3].Base, "")
assert.Equal(t, c_abi.Structs[3].Fields[0].Name, "struct_4_field_1")
assert.Equal(t, c_abi.Structs[3].Fields[0].Type, "string")
// Actions
assert.Equal(t, c_abi.Actions[0].Name, eos.ActN("action_name_1"))
assert.Equal(t, c_abi.Actions[0].Type, "struct_name_1")
assert.Equal(t, c_abi.Actions[0].RicardianContract, "")
// Tables
assert.Equal(t, c_abi.Tables[0].Name, eos.TableName("table_name_1"))
assert.Equal(t, c_abi.Tables[0].Type, "struct_name_1")
assert.Equal(t, c_abi.Tables[0].IndexType, "i64")
assert.Equal(t, c_abi.Tables[0].KeyNames[0], "key_name_1")
assert.Equal(t, c_abi.Tables[0].KeyNames[1], "key_name_2")
assert.Equal(t, c_abi.Tables[0].KeyTypes[0], "string")
assert.Equal(t, c_abi.Tables[0].KeyTypes[1], "int")
}
func TestCacheMiss(t *testing.T) {
client, _ := redismock.NewClientMock()
c := NewCache("thalos::cache::test", &redis_cache.Options{
Redis: client,
// Cache 10k keys for 1 minute.
LocalCache: redis_cache.NewTinyLFU(10000, time.Minute),
})
_, err := c.Get("nonexist")
assert.Error(t, err)
}

View file

@ -6,26 +6,18 @@ import (
"time"
eos "github.com/eoscanada/eos-go"
redis_cache "github.com/go-redis/cache/v9"
"github.com/redis/go-redis/v9"
"github.com/eosswedenorg/thalos/app/cache"
)
// AbiManager handles an ABI cache that fetches the ABI from an API on cache miss.
type AbiManager struct {
cache *Cache
cache *cache.Cache
api *eos.API
ctx context.Context
}
// Create a new ABI Manager
func NewAbiManager(rdb *redis.Client, api *eos.API, id string) *AbiManager {
// Init abi cache
cache := NewCache("thalos::cache::"+id+"::abi", &redis_cache.Options{
Redis: rdb,
// Cache 10k keys for 10 minutes.
LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute),
})
func NewAbiManager(cache *cache.Cache, api *eos.API) *AbiManager {
return &AbiManager{
cache: cache,
api: api,
@ -35,28 +27,26 @@ func NewAbiManager(rdb *redis.Client, api *eos.API, id string) *AbiManager {
// Set or update an ABI in the cache.
func (mgr *AbiManager) SetAbi(account eos.AccountName, abi *eos.ABI) error {
return mgr.cache.Set(string(account), abi, time.Hour)
return mgr.cache.Set(string(account), *abi, time.Hour)
}
// Get an ABI from the cache, on cache miss it is fetched from the
// API, gets cached and then returned to the user
func (mgr *AbiManager) GetAbi(account eos.AccountName) (*eos.ABI, error) {
key := string(account)
abi, err := mgr.cache.Get(key)
if err != nil {
var abi eos.ABI
if err := mgr.cache.Get(string(account), &abi); err != nil {
ctx, cancel := context.WithTimeout(mgr.ctx, time.Second)
defer cancel()
resp, err := mgr.api.GetABI(ctx, account)
if err != nil {
return nil, fmt.Errorf("api: %s", err)
}
abi = &resp.ABI
abi = resp.ABI
err = mgr.SetAbi(account, abi)
err = mgr.SetAbi(account, &abi)
if err != nil {
return nil, fmt.Errorf("cache: %s", err)
}
}
return abi, nil
return &abi, nil
}

173
app/abi/manager_test.go Normal file
View file

@ -0,0 +1,173 @@
package abi
import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
eos "github.com/eoscanada/eos-go"
"github.com/eosswedenorg/thalos/app/cache"
"github.com/stretchr/testify/assert"
)
var abiString = `
{
"version": "eosio::abi/1.0",
"types": [{
"new_type_name": "new_type_name_1",
"type": "name"
}],
"structs": [
{
"name": "struct_name_1",
"base": "struct_name_2",
"fields": [
{"name":"struct_1_field_1", "type":"new_type_name_1"},
{"name":"struct_1_field_2", "type":"struct_name_3"},
{"name":"struct_1_field_3", "type":"string?"},
{"name":"struct_1_field_4", "type":"string?"},
{"name":"struct_1_field_5", "type":"struct_name_4[]"}
]
},{
"name": "struct_name_2",
"base": "",
"fields": [
{"name":"struct_2_field_1", "type":"string"}
]
},{
"name": "struct_name_3",
"base": "",
"fields": [
{"name":"struct_3_field_1", "type":"string"}
]
},{
"name": "struct_name_4",
"base": "",
"fields": [
{"name":"struct_4_field_1", "type":"string"}
]
}
],
"actions": [{
"name": "action_name_1",
"type": "struct_name_1",
"ricardian_contract": ""
}],
"tables": [{
"name": "table_name_1",
"index_type": "i64",
"key_names": [
"key_name_1",
"key_name_2"
],
"key_types": [
"string",
"int"
],
"type": "struct_name_1"
}
]
}
`
func assert_abi(t *testing.T, abi *eos.ABI) {
assert.Equal(t, abi.Version, "eosio::abi/1.0")
// Types
assert.Equal(t, abi.Types[0].NewTypeName, "new_type_name_1")
assert.Equal(t, abi.Types[0].Type, "name")
// Structs
assert.Equal(t, abi.Structs[0].Name, "struct_name_1")
assert.Equal(t, abi.Structs[0].Base, "struct_name_2")
assert.Equal(t, abi.Structs[0].Fields[0].Name, "struct_1_field_1")
assert.Equal(t, abi.Structs[0].Fields[0].Type, "new_type_name_1")
assert.Equal(t, abi.Structs[0].Fields[1].Name, "struct_1_field_2")
assert.Equal(t, abi.Structs[0].Fields[1].Type, "struct_name_3")
assert.Equal(t, abi.Structs[0].Fields[2].Name, "struct_1_field_3")
assert.Equal(t, abi.Structs[0].Fields[2].Type, "string?")
assert.Equal(t, abi.Structs[0].Fields[3].Name, "struct_1_field_4")
assert.Equal(t, abi.Structs[0].Fields[3].Type, "string?")
assert.Equal(t, abi.Structs[0].Fields[4].Name, "struct_1_field_5")
assert.Equal(t, abi.Structs[0].Fields[4].Type, "struct_name_4[]")
assert.Equal(t, abi.Structs[1].Name, "struct_name_2")
assert.Equal(t, abi.Structs[1].Base, "")
assert.Equal(t, abi.Structs[1].Fields[0].Name, "struct_2_field_1")
assert.Equal(t, abi.Structs[1].Fields[0].Type, "string")
assert.Equal(t, abi.Structs[2].Name, "struct_name_3")
assert.Equal(t, abi.Structs[2].Base, "")
assert.Equal(t, abi.Structs[2].Fields[0].Name, "struct_3_field_1")
assert.Equal(t, abi.Structs[2].Fields[0].Type, "string")
assert.Equal(t, abi.Structs[3].Name, "struct_name_4")
assert.Equal(t, abi.Structs[3].Base, "")
assert.Equal(t, abi.Structs[3].Fields[0].Name, "struct_4_field_1")
assert.Equal(t, abi.Structs[3].Fields[0].Type, "string")
// Actions
assert.Equal(t, abi.Actions[0].Name, eos.ActN("action_name_1"))
assert.Equal(t, abi.Actions[0].Type, "struct_name_1")
assert.Equal(t, abi.Actions[0].RicardianContract, "")
// Tables
assert.Equal(t, abi.Tables[0].Name, eos.TableName("table_name_1"))
assert.Equal(t, abi.Tables[0].Type, "struct_name_1")
assert.Equal(t, abi.Tables[0].IndexType, "i64")
assert.Equal(t, abi.Tables[0].KeyNames[0], "key_name_1")
assert.Equal(t, abi.Tables[0].KeyNames[1], "key_name_2")
assert.Equal(t, abi.Tables[0].KeyTypes[0], "string")
assert.Equal(t, abi.Tables[0].KeyTypes[1], "int")
}
func mockAPI(handler http.HandlerFunc) (*eos.API, *httptest.Server) {
server := httptest.NewServer(handler)
return &eos.API{
HttpClient: server.Client(),
BaseURL: strings.TrimRight(server.URL, "/"),
Compress: eos.CompressionZlib,
Header: make(http.Header),
}, server
}
func TestManager_GetAbiFromCache(t *testing.T) {
cache := cache.NewCache("thalos::cache::abi::test", cache.NewMemoryStore())
api, _ := mockAPI(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
}))
mgr := NewAbiManager(cache, api)
abi, err := eos.NewABI(strings.NewReader(abiString))
assert.NoError(t, err)
err = mgr.SetAbi("testaccount", abi)
assert.NoError(t, err)
c_abi, err := mgr.GetAbi("testaccount")
assert.NoError(t, err)
assert_abi(t, c_abi)
}
func TestManager_GetAbiFromAPI(t *testing.T) {
cache := cache.NewCache("thalos::cache::abi::test", cache.NewMemoryStore())
api, _ := mockAPI(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body := fmt.Sprintf(`{"account_name": "testaccount", "abi": %s}`, abiString)
_, err := w.Write([]byte(body))
assert.NoError(t, err)
}))
mgr := NewAbiManager(cache, api)
c_abi, err := mgr.GetAbi("testaccount")
assert.NoError(t, err)
assert_abi(t, c_abi)
}

30
app/cache/cache.go vendored Normal file
View file

@ -0,0 +1,30 @@
package cache
import (
"time"
)
type Cache struct {
store Store
prefix string
}
// Create a new cache
func NewCache(prefix string, store Store) *Cache {
return &Cache{
store: store,
prefix: prefix,
}
}
func (cache *Cache) Get(key string, value any) error {
return cache.store.Get(cache.key(key), value)
}
func (cache *Cache) Set(key string, value any, ttl time.Duration) error {
return cache.store.Set(cache.key(key), value, ttl)
}
func (cache *Cache) key(key string) string {
return cache.prefix + "::" + key
}

55
app/cache/memory_store.go vendored Normal file
View file

@ -0,0 +1,55 @@
package cache
import (
"fmt"
"reflect"
"time"
)
var now = time.Now
type memoryStoreItem struct {
value any
expired time.Time
}
type MemoryStore struct {
data map[string]memoryStoreItem
}
func NewMemoryStore() *MemoryStore {
return &MemoryStore{make(map[string]memoryStoreItem)}
}
func (s *MemoryStore) Get(key string, value any) error {
if item, ok := s.data[key]; ok {
if item.expired.Before(now()) {
delete(s.data, key)
return fmt.Errorf("key: %s does not exist", key)
}
v := reflect.ValueOf(value)
if v.Kind() != reflect.Pointer {
return fmt.Errorf("value must be of pointer type, '%s' passed", v.Kind().String())
}
v.Elem().Set(reflect.ValueOf(item.value))
return nil
}
return fmt.Errorf("key: %s does not exist", key)
}
func (s *MemoryStore) Has(key string) bool {
_, hit := s.data[key]
return hit
}
func (s *MemoryStore) Set(key string, value any, ttl time.Duration) error {
s.data[key] = memoryStoreItem{
value: value,
expired: now().Add(ttl),
}
return nil
}

87
app/cache/memory_store_test.go vendored Normal file
View file

@ -0,0 +1,87 @@
package cache
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
type memoryTestItem struct {
String string
Num uint32
Float float32
}
func TestMemoryStore_Set(t *testing.T) {
now = func() time.Time { return time.Unix(1581315270, 0) }
item := memoryTestItem{
String: "MyString",
Num: 23,
Float: 3.14,
}
expected := map[string]memoryStoreItem{
"key1": {
value: item,
expired: now().Add(time.Hour),
},
}
store := NewMemoryStore()
err := store.Set("key1", item, time.Hour)
assert.NoError(t, err)
assert.Equal(t, expected, store.data)
}
func TestMemoryStore_GetMiss(t *testing.T) {
store := NewMemoryStore()
var v any
err := store.Get("Key2", &v)
assert.Error(t, err)
}
func TestMemoryStore_GetHit(t *testing.T) {
expected := memoryTestItem{
String: "MyString",
Num: 23,
Float: 3.14,
}
store := NewMemoryStore()
err := store.Set("key1", expected, time.Hour)
assert.NoError(t, err)
var actual memoryTestItem
err = store.Get("key1", &actual)
assert.NoError(t, err)
assert.Equal(t, expected, actual)
}
func TestMemoryStore_GetNonPointer(t *testing.T) {
expected := memoryTestItem{
String: "MyString",
Num: 23,
Float: 3.14,
}
store := NewMemoryStore()
err := store.Set("key1", expected, time.Hour)
assert.NoError(t, err)
var actual string
err = store.Get("key1", actual)
assert.EqualError(t, err, "value must be of pointer type, 'string' passed")
}
func TestMemoryStore_Has(t *testing.T) {
store := NewMemoryStore()
err := store.Set("key1", "value", time.Hour)
assert.NoError(t, err)
assert.True(t, store.Has("key1"))
assert.False(t, store.Has("key2"))
}

37
app/cache/redis_store.go vendored Normal file
View file

@ -0,0 +1,37 @@
package cache
import (
"context"
"time"
"github.com/go-redis/cache/v9"
)
type RedisStore struct {
c *cache.Cache
ctx context.Context
}
func NewRedisStore(options *cache.Options) *RedisStore {
return &RedisStore{
c: cache.New(options),
ctx: context.Background(),
}
}
func (s *RedisStore) Get(key string, value interface{}) error {
return s.c.Get(s.ctx, key, value)
}
func (s *RedisStore) Has(key string) bool {
return s.c.Exists(s.ctx, key)
}
func (s *RedisStore) Set(key string, value any, ttl time.Duration) error {
return s.c.Set(&cache.Item{
Ctx: s.ctx,
Key: key,
Value: value,
TTL: ttl,
})
}

103
app/cache/redis_store_test.go vendored Normal file
View file

@ -0,0 +1,103 @@
package cache
import (
"testing"
"time"
"github.com/go-redis/redismock/v9"
redis_cache "github.com/go-redis/cache/v9"
"github.com/stretchr/testify/assert"
)
type testItem struct {
Num uint32
Name string
}
func TestRedisStore_Set(t *testing.T) {
client, mock := redismock.NewClientMock()
store := NewRedisStore(&redis_cache.Options{
Redis: client,
})
expected := testItem{
Num: 24,
Name: "Some Name",
}
bytes, err := store.c.Marshal(expected)
assert.NoError(t, err)
mock.ExpectSet("mykey", bytes, time.Minute).SetVal("OK")
err = store.Set("mykey", expected, time.Minute)
assert.NoError(t, err)
assert.NoError(t, mock.ExpectationsWereMet())
}
func TestRedisStore_GetMiss(t *testing.T) {
client, mock := redismock.NewClientMock()
store := NewRedisStore(&redis_cache.Options{
Redis: client,
})
mock.ExpectGet("mykey").SetErr(redis_cache.ErrCacheMiss)
expected := testItem{}
err := store.Get("mykey", &expected)
assert.ErrorIs(t, err, redis_cache.ErrCacheMiss)
assert.NoError(t, mock.ExpectationsWereMet())
}
func TestRedisStore_GetHit(t *testing.T) {
client, mock := redismock.NewClientMock()
store := NewRedisStore(&redis_cache.Options{
Redis: client,
})
expected := testItem{
Num: 42,
Name: "MyName",
}
bytes, err := store.c.Marshal(expected)
assert.NoError(t, err)
mock.ExpectSet("mykey2", bytes, time.Second*20).SetVal("OK")
mock.ExpectGet("mykey2").SetVal(string(bytes))
err = store.Set("mykey2", expected, time.Second*20)
assert.NoError(t, err)
actual := testItem{}
err = store.Get("mykey2", &actual)
assert.NoError(t, err)
assert.Equal(t, expected, actual)
assert.NoError(t, mock.ExpectationsWereMet())
}
func TestRedisStore_Has(t *testing.T) {
client, mock := redismock.NewClientMock()
store := NewRedisStore(&redis_cache.Options{
Redis: client,
})
bytes, err := store.c.Marshal("value")
assert.NoError(t, err)
mock.ExpectSet("key1", bytes, time.Minute*15).SetVal("OK")
mock.ExpectGet("key1").SetVal(string(bytes))
mock.ExpectGet("key2").RedisNil()
err = store.Set("key1", "value", time.Minute*15)
assert.NoError(t, err)
assert.True(t, store.Has("key1"))
assert.False(t, store.Has("key2"))
assert.NoError(t, mock.ExpectationsWereMet())
}

15
app/cache/store.go vendored Normal file
View file

@ -0,0 +1,15 @@
package cache
import "time"
type Store interface {
// Set an item in the store.
Set(key string, value any, TTL time.Duration) error
// Get an item from the store.
// returns an error if key is not found or there is other problems.
Get(key string, value any) error
// Check if a key exist in the store.
Has(key string) bool
}

View file

@ -44,24 +44,29 @@ type ShipProcessor struct {
// Encoder used to encode messages
encode message.Encoder
// Keep track of the current block we have processed.
current_block uint32
// Function for saving state.
saver StateSaver
// Internal state
state State
// System contract ("eosio" per default)
syscontract eos.AccountName
}
// SpawnProcessor creates a new ShipProccessor that consumes the shipclient.Stream passed to it.
func SpawnProccessor(shipStream *shipclient.Stream, writer driver.Writer, abi *abi.AbiManager, codec message.Codec) *ShipProcessor {
func SpawnProccessor(shipStream *shipclient.Stream, loader StateLoader, saver StateSaver, writer driver.Writer, abi *abi.AbiManager, codec message.Codec) *ShipProcessor {
processor := &ShipProcessor{
abi: abi,
writer: writer,
shipStream: shipStream,
encode: logDecoratedEncoder(codec.Encoder),
syscontract: eos.AccountName("eosio"),
current_block: shipStream.StartBlock,
saver: saver,
abi: abi,
writer: writer,
shipStream: shipStream,
encode: logDecoratedEncoder(codec.Encoder),
syscontract: eos.AccountName("eosio"),
}
loader(&processor.state)
// Attach handlers
shipStream.BlockHandler = processor.processBlock
@ -125,15 +130,15 @@ func (processor *ShipProcessor) updateAbiFromAction(act *ship.Action) error {
// Get the current block.
func (processor *ShipProcessor) GetCurrentBlock() uint32 {
return processor.current_block
return processor.state.CurrentBlock
}
// Callback function called by shipclient.Stream when a new block arrives.
func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) {
processor.current_block = block.ThisBlock.BlockNum
processor.state.CurrentBlock = block.ThisBlock.BlockNum
if block.ThisBlock.BlockNum%100 == 0 {
log.Infof("Current: %d, Head: %d", processor.current_block, block.Head.BlockNum)
log.Infof("Current: %d, Head: %d", processor.state.CurrentBlock, block.Head.BlockNum)
}
if block.ThisBlock.BlockNum%10 == 0 {
@ -273,9 +278,14 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) {
if err != nil {
log.WithError(err).Error("Failed to send messages")
}
err = processor.saver(processor.state)
if err != nil {
log.WithError(err).Error("Failed to save state")
}
}
// Close closes the writer assciated with the processor.
// Close closes the writer associated with the processor.
func (processor *ShipProcessor) Close() error {
return processor.writer.Close()
}

15
app/state.go Normal file
View file

@ -0,0 +1,15 @@
package app
// State represents thalos runtime state
type State struct {
// Last processed block
CurrentBlock uint32
}
type (
// StateLoader is a function that loads a state.
StateLoader func(*State)
// StateSaver is a function that saves a state.
StateSaver func(State) error
)

View file

@ -21,9 +21,11 @@ import (
api_redis "github.com/eosswedenorg/thalos/api/redis"
"github.com/eosswedenorg/thalos/app"
"github.com/eosswedenorg/thalos/app/abi"
. "github.com/eosswedenorg/thalos/app/cache"
"github.com/eosswedenorg/thalos/app/config"
driver "github.com/eosswedenorg/thalos/app/driver/redis"
. "github.com/eosswedenorg/thalos/app/log"
redis_cache "github.com/go-redis/cache/v9"
"github.com/nikoksr/notify"
"github.com/nikoksr/notify/service/telegram"
"github.com/pborman/getopt/v2"
@ -45,6 +47,10 @@ var VersionString string = "dev"
var exit chan bool
var cache *Cache
var cacheStore Store
func readerLoop(processor *app.ShipProcessor) {
recon_cnt := 0
@ -171,6 +177,50 @@ func LogLevels() []string {
return list
}
func initAbiManger(api *eos.API, chain_id string) *abi.AbiManager {
cache := NewCache("thalos::cache::abi::"+chain_id, cacheStore)
return abi.NewAbiManager(cache, api)
}
func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) app.StateLoader {
return func(state *app.State) {
var source string
// Load state from cache.
err := cache.Get("state", &state)
// on error (cache miss) or if current_block_no_cache is set.
// set current block from config/api
if current_block_no_cache || err != nil {
// Set from config if we have a sane value.
if conf.Ship.StartBlockNum != shipclient.NULL_BLOCK_NUMBER {
source = "config"
state.CurrentBlock = conf.Ship.StartBlockNum
} else {
// Otherwise, set from api.
if conf.Ship.IrreversibleOnly {
source = "api (LIB)"
state.CurrentBlock = uint32(chainInfo.LastIrreversibleBlockNum)
} else {
source = "api (HEAD)"
state.CurrentBlock = uint32(chainInfo.HeadBlockNum)
}
}
} else {
source = "cache"
}
log.WithFields(log.Fields{
"block": state.CurrentBlock,
"source": source,
}).Info("Starting from block")
}
}
func stateSaver(state app.State) error {
return cache.Set("state", state, 0)
}
func main() {
var err error
var chainInfo *eos.InfoResp
@ -183,6 +233,7 @@ func main() {
pidFile := getopt.StringLong("pid", 'p', "", "Where to write process id", "file")
logFile := getopt.StringLong("log", 'l', "", "Path to log file", "file")
logLevel := getopt.EnumLong("level", 'L', LogLevels(), "info", "Log level to use")
skip_currentblock_cache := getopt.Bool('n', "Force the application to take start block from config/api")
getopt.Parse()
@ -281,6 +332,16 @@ func main() {
return
}
// Setup cache storage
cacheStore = NewRedisStore(&redis_cache.Options{
Redis: rdb,
// Cache 10k keys for 10 minutes.
LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute),
})
// Setup general cache
cache = NewCache("thalos::cache::instance::"+conf.Name, cacheStore)
log.WithField("api", conf.Api).Info("Get chain info from api")
eosClient := eos.New(conf.Api)
chainInfo, err = eosClient.GetInfo(context.Background())
@ -289,14 +350,6 @@ func main() {
return
}
if conf.Ship.StartBlockNum == shipclient.NULL_BLOCK_NUMBER {
if conf.Ship.IrreversibleOnly {
conf.Ship.StartBlockNum = uint32(chainInfo.LastIrreversibleBlockNum)
} else {
conf.Ship.StartBlockNum = uint32(chainInfo.HeadBlockNum)
}
}
shClient = shipclient.NewStream(func(s *shipclient.Stream) {
s.StartBlock = conf.Ship.StartBlockNum
s.EndBlock = conf.Ship.EndBlockNum
@ -314,11 +367,13 @@ func main() {
processor := app.SpawnProccessor(
shClient,
stateLoader(chainInfo, *skip_currentblock_cache),
stateSaver,
driver.NewPublisher(context.Background(), rdb, api_redis.Namespace{
Prefix: conf.Redis.Prefix,
ChainID: chain_id,
}),
abi.NewAbiManager(rdb, eosClient, chain_id),
initAbiManger(eosClient, chain_id),
codec,
)