From 23d05e55125b29c1cc3214e50eeaa32994b59eec Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Tue, 31 Oct 2023 16:58:40 +0100 Subject: [PATCH 01/13] Adding app/cache/store.go --- app/cache/store.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 app/cache/store.go diff --git a/app/cache/store.go b/app/cache/store.go new file mode 100644 index 0000000..309333f --- /dev/null +++ b/app/cache/store.go @@ -0,0 +1,15 @@ +package cache + +import "time" + +type Store interface { + // Set an item in the store. + Set(key string, value any, TTL time.Duration) error + + // Get an item from the store. + // returns an error if key is not found or there is other problems. + Get(key string, value any) error + + // Check if a key exist in the store. + Has(key string) bool +} From ed1009062dd3fb390517fab383a14a6145c74514 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Tue, 31 Oct 2023 16:59:08 +0100 Subject: [PATCH 02/13] Adding app/cache/memory_store.go --- app/cache/memory_store.go | 55 +++++++++++++++++++++ app/cache/memory_store_test.go | 87 ++++++++++++++++++++++++++++++++++ 2 files changed, 142 insertions(+) create mode 100644 app/cache/memory_store.go create mode 100644 app/cache/memory_store_test.go diff --git a/app/cache/memory_store.go b/app/cache/memory_store.go new file mode 100644 index 0000000..7c3bd9f --- /dev/null +++ b/app/cache/memory_store.go @@ -0,0 +1,55 @@ +package cache + +import ( + "fmt" + "reflect" + "time" +) + +var now = time.Now + +type memoryStoreItem struct { + value any + expired time.Time +} + +type MemoryStore struct { + data map[string]memoryStoreItem +} + +func NewMemoryStore() *MemoryStore { + return &MemoryStore{make(map[string]memoryStoreItem)} +} + +func (s *MemoryStore) Get(key string, value any) error { + if item, ok := s.data[key]; ok { + + if item.expired.Before(now()) { + delete(s.data, key) + return fmt.Errorf("key: %s does not exist", key) + } + + v := reflect.ValueOf(value) + if v.Kind() != reflect.Pointer { + return fmt.Errorf("value must be of pointer type, '%s' passed", v.Kind().String()) + } + + v.Elem().Set(reflect.ValueOf(item.value)) + + return nil + } + return fmt.Errorf("key: %s does not exist", key) +} + +func (s *MemoryStore) Has(key string) bool { + _, hit := s.data[key] + return hit +} + +func (s *MemoryStore) Set(key string, value any, ttl time.Duration) error { + s.data[key] = memoryStoreItem{ + value: value, + expired: now().Add(ttl), + } + return nil +} diff --git a/app/cache/memory_store_test.go b/app/cache/memory_store_test.go new file mode 100644 index 0000000..d6bc6bd --- /dev/null +++ b/app/cache/memory_store_test.go @@ -0,0 +1,87 @@ +package cache + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type memoryTestItem struct { + String string + Num uint32 + Float float32 +} + +func TestMemoryStore_Set(t *testing.T) { + now = func() time.Time { return time.Unix(1581315270, 0) } + + item := memoryTestItem{ + String: "MyString", + Num: 23, + Float: 3.14, + } + + expected := map[string]memoryStoreItem{ + "key1": { + value: item, + expired: now().Add(time.Hour), + }, + } + + store := NewMemoryStore() + err := store.Set("key1", item, time.Hour) + assert.NoError(t, err) + + assert.Equal(t, expected, store.data) +} + +func TestMemoryStore_GetMiss(t *testing.T) { + store := NewMemoryStore() + + var v any + err := store.Get("Key2", &v) + assert.Error(t, err) +} + +func TestMemoryStore_GetHit(t *testing.T) { + expected := memoryTestItem{ + String: "MyString", + Num: 23, + Float: 3.14, + } + + store := NewMemoryStore() + err := store.Set("key1", expected, time.Hour) + assert.NoError(t, err) + + var actual memoryTestItem + err = store.Get("key1", &actual) + assert.NoError(t, err) + assert.Equal(t, expected, actual) +} + +func TestMemoryStore_GetNonPointer(t *testing.T) { + expected := memoryTestItem{ + String: "MyString", + Num: 23, + Float: 3.14, + } + + store := NewMemoryStore() + err := store.Set("key1", expected, time.Hour) + assert.NoError(t, err) + + var actual string + err = store.Get("key1", actual) + assert.EqualError(t, err, "value must be of pointer type, 'string' passed") +} + +func TestMemoryStore_Has(t *testing.T) { + store := NewMemoryStore() + err := store.Set("key1", "value", time.Hour) + assert.NoError(t, err) + + assert.True(t, store.Has("key1")) + assert.False(t, store.Has("key2")) +} From c6cb26d543a7e42290efa51463daf1571e6213bb Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Tue, 31 Oct 2023 16:59:26 +0100 Subject: [PATCH 03/13] Adding app/cache/redis_store.go --- app/cache/redis_store.go | 37 ++++++++++++ app/cache/redis_store_test.go | 103 ++++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 app/cache/redis_store.go create mode 100644 app/cache/redis_store_test.go diff --git a/app/cache/redis_store.go b/app/cache/redis_store.go new file mode 100644 index 0000000..5016a15 --- /dev/null +++ b/app/cache/redis_store.go @@ -0,0 +1,37 @@ +package cache + +import ( + "context" + "time" + + "github.com/go-redis/cache/v9" +) + +type RedisStore struct { + c *cache.Cache + ctx context.Context +} + +func NewRedisStore(options *cache.Options) *RedisStore { + return &RedisStore{ + c: cache.New(options), + ctx: context.Background(), + } +} + +func (s *RedisStore) Get(key string, value interface{}) error { + return s.c.Get(s.ctx, key, value) +} + +func (s *RedisStore) Has(key string) bool { + return s.c.Exists(s.ctx, key) +} + +func (s *RedisStore) Set(key string, value any, ttl time.Duration) error { + return s.c.Set(&cache.Item{ + Ctx: s.ctx, + Key: key, + Value: value, + TTL: ttl, + }) +} diff --git a/app/cache/redis_store_test.go b/app/cache/redis_store_test.go new file mode 100644 index 0000000..34e3f34 --- /dev/null +++ b/app/cache/redis_store_test.go @@ -0,0 +1,103 @@ +package cache + +import ( + "testing" + "time" + + "github.com/go-redis/redismock/v9" + + redis_cache "github.com/go-redis/cache/v9" + "github.com/stretchr/testify/assert" +) + +type testItem struct { + Num uint32 + Name string +} + +func TestRedisStore_Set(t *testing.T) { + client, mock := redismock.NewClientMock() + + store := NewRedisStore(&redis_cache.Options{ + Redis: client, + }) + + expected := testItem{ + Num: 24, + Name: "Some Name", + } + + bytes, err := store.c.Marshal(expected) + assert.NoError(t, err) + + mock.ExpectSet("mykey", bytes, time.Minute).SetVal("OK") + + err = store.Set("mykey", expected, time.Minute) + assert.NoError(t, err) + + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestRedisStore_GetMiss(t *testing.T) { + client, mock := redismock.NewClientMock() + + store := NewRedisStore(&redis_cache.Options{ + Redis: client, + }) + + mock.ExpectGet("mykey").SetErr(redis_cache.ErrCacheMiss) + + expected := testItem{} + err := store.Get("mykey", &expected) + assert.ErrorIs(t, err, redis_cache.ErrCacheMiss) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestRedisStore_GetHit(t *testing.T) { + client, mock := redismock.NewClientMock() + + store := NewRedisStore(&redis_cache.Options{ + Redis: client, + }) + + expected := testItem{ + Num: 42, + Name: "MyName", + } + + bytes, err := store.c.Marshal(expected) + assert.NoError(t, err) + + mock.ExpectSet("mykey2", bytes, time.Second*20).SetVal("OK") + mock.ExpectGet("mykey2").SetVal(string(bytes)) + + err = store.Set("mykey2", expected, time.Second*20) + assert.NoError(t, err) + + actual := testItem{} + err = store.Get("mykey2", &actual) + assert.NoError(t, err) + assert.Equal(t, expected, actual) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestRedisStore_Has(t *testing.T) { + client, mock := redismock.NewClientMock() + + store := NewRedisStore(&redis_cache.Options{ + Redis: client, + }) + + bytes, err := store.c.Marshal("value") + assert.NoError(t, err) + + mock.ExpectSet("key1", bytes, time.Minute*15).SetVal("OK") + mock.ExpectGet("key1").SetVal(string(bytes)) + mock.ExpectGet("key2").RedisNil() + + err = store.Set("key1", "value", time.Minute*15) + assert.NoError(t, err) + assert.True(t, store.Has("key1")) + assert.False(t, store.Has("key2")) + assert.NoError(t, mock.ExpectationsWereMet()) +} From 2eb62db117aeec4a09774e2602925ba9dae4d1bd Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Wed, 1 Nov 2023 21:23:48 +0100 Subject: [PATCH 04/13] Adding app/cache/cache.go --- app/cache/cache.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 app/cache/cache.go diff --git a/app/cache/cache.go b/app/cache/cache.go new file mode 100644 index 0000000..290019b --- /dev/null +++ b/app/cache/cache.go @@ -0,0 +1,30 @@ +package cache + +import ( + "time" +) + +type Cache struct { + store Store + prefix string +} + +// Create a new cache +func NewCache(prefix string, store Store) *Cache { + return &Cache{ + store: store, + prefix: prefix, + } +} + +func (cache *Cache) Get(key string, value any) error { + return cache.store.Get(cache.key(key), value) +} + +func (cache *Cache) Set(key string, value any, ttl time.Duration) error { + return cache.store.Set(cache.key(key), value, ttl) +} + +func (cache *Cache) key(key string) string { + return cache.prefix + "::" + key +} From 424ae2fc40eb915a33259448d2a6387416f94974 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Wed, 1 Nov 2023 21:24:59 +0100 Subject: [PATCH 05/13] app/abi: rework manager to use new cache struct. --- app/abi/cache.go | 46 ----------- app/abi/cache_test.go | 158 ------------------------------------ app/abi/manager.go | 28 +++---- app/abi/manager_test.go | 173 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 182 insertions(+), 223 deletions(-) delete mode 100644 app/abi/cache.go delete mode 100644 app/abi/cache_test.go create mode 100644 app/abi/manager_test.go diff --git a/app/abi/cache.go b/app/abi/cache.go deleted file mode 100644 index 3b54084..0000000 --- a/app/abi/cache.go +++ /dev/null @@ -1,46 +0,0 @@ -package abi - -import ( - "context" - "time" - - eos "github.com/eoscanada/eos-go" - redis_cache "github.com/go-redis/cache/v9" -) - -// Cache represents a abi cache in redis. -type Cache struct { - c *redis_cache.Cache - ctx context.Context - prefix string -} - -// Create a new cache -func NewCache(prefix string, options *redis_cache.Options) *Cache { - return &Cache{ - c: redis_cache.New(options), - ctx: context.Background(), - prefix: prefix, - } -} - -// Get an ABI from the cache using the contract account name as the key. -func (cache *Cache) Get(account string) (*eos.ABI, error) { - var v eos.ABI - err := cache.c.Get(cache.ctx, cache.key(account), &v) - return &v, err -} - -// Set an ABI in the cache. -func (cache *Cache) Set(account string, abi *eos.ABI, ttl time.Duration) error { - return cache.c.Set(&redis_cache.Item{ - Ctx: cache.ctx, - Key: cache.key(account), - Value: *abi, - TTL: ttl, - }) -} - -func (cache *Cache) key(account string) string { - return cache.prefix + "::" + account -} diff --git a/app/abi/cache_test.go b/app/abi/cache_test.go deleted file mode 100644 index da74e74..0000000 --- a/app/abi/cache_test.go +++ /dev/null @@ -1,158 +0,0 @@ -package abi - -import ( - "strings" - "testing" - "time" - - eos "github.com/eoscanada/eos-go" - redis_cache "github.com/go-redis/cache/v9" - "github.com/go-redis/redismock/v9" - - "github.com/stretchr/testify/assert" -) - -var abiString = ` -{ - "version": "eosio::abi/1.0", - "types": [{ - "new_type_name": "new_type_name_1", - "type": "name" - }], - "structs": [ - { - "name": "struct_name_1", - "base": "struct_name_2", - "fields": [ - {"name":"struct_1_field_1", "type":"new_type_name_1"}, - {"name":"struct_1_field_2", "type":"struct_name_3"}, - {"name":"struct_1_field_3", "type":"string?"}, - {"name":"struct_1_field_4", "type":"string?"}, - {"name":"struct_1_field_5", "type":"struct_name_4[]"} - ] - },{ - "name": "struct_name_2", - "base": "", - "fields": [ - {"name":"struct_2_field_1", "type":"string"} - ] - },{ - "name": "struct_name_3", - "base": "", - "fields": [ - {"name":"struct_3_field_1", "type":"string"} - ] - },{ - "name": "struct_name_4", - "base": "", - "fields": [ - {"name":"struct_4_field_1", "type":"string"} - ] - } - ], - "actions": [{ - "name": "action_name_1", - "type": "struct_name_1", - "ricardian_contract": "" - }], - "tables": [{ - "name": "table_name_1", - "index_type": "i64", - "key_names": [ - "key_name_1", - "key_name_2" - ], - "key_types": [ - "string", - "int" - ], - "type": "struct_name_1" - } - ] -} -` - -func TestGetSet(t *testing.T) { - client, mock := redismock.NewClientMock() - - c := NewCache("thalos::cache::test", &redis_cache.Options{ - Redis: client, - // Cache 10k keys for 1 minute. - LocalCache: redis_cache.NewTinyLFU(10000, time.Minute), - }) - - abi, err := eos.NewABI(strings.NewReader(abiString)) - assert.NoError(t, err) - - bytes, _ := c.c.Marshal(*abi) - - mock.ExpectSet("thalos::cache::test::testaccount", bytes, time.Minute).SetVal("OK") - - err = c.Set("testaccount", abi, time.Minute) - assert.NoError(t, err) - - c_abi, err := c.Get("testaccount") - assert.NoError(t, err) - - assert.Equal(t, c_abi.Version, "eosio::abi/1.0") - - // Types - assert.Equal(t, c_abi.Types[0].NewTypeName, "new_type_name_1") - assert.Equal(t, c_abi.Types[0].Type, "name") - - // Structs - assert.Equal(t, c_abi.Structs[0].Name, "struct_name_1") - assert.Equal(t, c_abi.Structs[0].Base, "struct_name_2") - assert.Equal(t, c_abi.Structs[0].Fields[0].Name, "struct_1_field_1") - assert.Equal(t, c_abi.Structs[0].Fields[0].Type, "new_type_name_1") - assert.Equal(t, c_abi.Structs[0].Fields[1].Name, "struct_1_field_2") - assert.Equal(t, c_abi.Structs[0].Fields[1].Type, "struct_name_3") - assert.Equal(t, c_abi.Structs[0].Fields[2].Name, "struct_1_field_3") - assert.Equal(t, c_abi.Structs[0].Fields[2].Type, "string?") - assert.Equal(t, c_abi.Structs[0].Fields[3].Name, "struct_1_field_4") - assert.Equal(t, c_abi.Structs[0].Fields[3].Type, "string?") - assert.Equal(t, c_abi.Structs[0].Fields[4].Name, "struct_1_field_5") - assert.Equal(t, c_abi.Structs[0].Fields[4].Type, "struct_name_4[]") - - assert.Equal(t, c_abi.Structs[1].Name, "struct_name_2") - assert.Equal(t, c_abi.Structs[1].Base, "") - assert.Equal(t, c_abi.Structs[1].Fields[0].Name, "struct_2_field_1") - assert.Equal(t, c_abi.Structs[1].Fields[0].Type, "string") - - assert.Equal(t, c_abi.Structs[2].Name, "struct_name_3") - assert.Equal(t, c_abi.Structs[2].Base, "") - assert.Equal(t, c_abi.Structs[2].Fields[0].Name, "struct_3_field_1") - assert.Equal(t, c_abi.Structs[2].Fields[0].Type, "string") - - assert.Equal(t, c_abi.Structs[3].Name, "struct_name_4") - assert.Equal(t, c_abi.Structs[3].Base, "") - assert.Equal(t, c_abi.Structs[3].Fields[0].Name, "struct_4_field_1") - assert.Equal(t, c_abi.Structs[3].Fields[0].Type, "string") - - // Actions - assert.Equal(t, c_abi.Actions[0].Name, eos.ActN("action_name_1")) - assert.Equal(t, c_abi.Actions[0].Type, "struct_name_1") - assert.Equal(t, c_abi.Actions[0].RicardianContract, "") - - // Tables - assert.Equal(t, c_abi.Tables[0].Name, eos.TableName("table_name_1")) - assert.Equal(t, c_abi.Tables[0].Type, "struct_name_1") - assert.Equal(t, c_abi.Tables[0].IndexType, "i64") - assert.Equal(t, c_abi.Tables[0].KeyNames[0], "key_name_1") - assert.Equal(t, c_abi.Tables[0].KeyNames[1], "key_name_2") - assert.Equal(t, c_abi.Tables[0].KeyTypes[0], "string") - assert.Equal(t, c_abi.Tables[0].KeyTypes[1], "int") -} - -func TestCacheMiss(t *testing.T) { - client, _ := redismock.NewClientMock() - - c := NewCache("thalos::cache::test", &redis_cache.Options{ - Redis: client, - // Cache 10k keys for 1 minute. - LocalCache: redis_cache.NewTinyLFU(10000, time.Minute), - }) - - _, err := c.Get("nonexist") - assert.Error(t, err) -} diff --git a/app/abi/manager.go b/app/abi/manager.go index 79df1bd..d8f3682 100644 --- a/app/abi/manager.go +++ b/app/abi/manager.go @@ -6,26 +6,18 @@ import ( "time" eos "github.com/eoscanada/eos-go" - redis_cache "github.com/go-redis/cache/v9" - "github.com/redis/go-redis/v9" + "github.com/eosswedenorg/thalos/app/cache" ) // AbiManager handles an ABI cache that fetches the ABI from an API on cache miss. type AbiManager struct { - cache *Cache + cache *cache.Cache api *eos.API ctx context.Context } // Create a new ABI Manager -func NewAbiManager(rdb *redis.Client, api *eos.API, id string) *AbiManager { - // Init abi cache - cache := NewCache("thalos::cache::"+id+"::abi", &redis_cache.Options{ - Redis: rdb, - // Cache 10k keys for 10 minutes. - LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute), - }) - +func NewAbiManager(cache *cache.Cache, api *eos.API) *AbiManager { return &AbiManager{ cache: cache, api: api, @@ -35,26 +27,24 @@ func NewAbiManager(rdb *redis.Client, api *eos.API, id string) *AbiManager { // Set or update an ABI in the cache. func (mgr *AbiManager) SetAbi(account eos.AccountName, abi *eos.ABI) error { - return mgr.cache.Set(string(account), abi, time.Hour) + return mgr.cache.Set(string(account), *abi, time.Hour) } // Get an ABI from the cache, on cache miss it is fetched from the // API, gets cached and then returned to the user func (mgr *AbiManager) GetAbi(account eos.AccountName) (*eos.ABI, error) { - key := string(account) - - abi, err := mgr.cache.Get(key) - if err != nil { + var abi eos.ABI + if err := mgr.cache.Get(string(account), &abi); err != nil { resp, err := mgr.api.GetABI(mgr.ctx, account) if err != nil { return nil, fmt.Errorf("api: %s", err) } - abi = &resp.ABI + abi = resp.ABI - err = mgr.SetAbi(account, abi) + err = mgr.SetAbi(account, &abi) if err != nil { return nil, fmt.Errorf("cache: %s", err) } } - return abi, nil + return &abi, nil } diff --git a/app/abi/manager_test.go b/app/abi/manager_test.go new file mode 100644 index 0000000..661c8ec --- /dev/null +++ b/app/abi/manager_test.go @@ -0,0 +1,173 @@ +package abi + +import ( + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + + eos "github.com/eoscanada/eos-go" + + "github.com/eosswedenorg/thalos/app/cache" + "github.com/stretchr/testify/assert" +) + +var abiString = ` +{ + "version": "eosio::abi/1.0", + "types": [{ + "new_type_name": "new_type_name_1", + "type": "name" + }], + "structs": [ + { + "name": "struct_name_1", + "base": "struct_name_2", + "fields": [ + {"name":"struct_1_field_1", "type":"new_type_name_1"}, + {"name":"struct_1_field_2", "type":"struct_name_3"}, + {"name":"struct_1_field_3", "type":"string?"}, + {"name":"struct_1_field_4", "type":"string?"}, + {"name":"struct_1_field_5", "type":"struct_name_4[]"} + ] + },{ + "name": "struct_name_2", + "base": "", + "fields": [ + {"name":"struct_2_field_1", "type":"string"} + ] + },{ + "name": "struct_name_3", + "base": "", + "fields": [ + {"name":"struct_3_field_1", "type":"string"} + ] + },{ + "name": "struct_name_4", + "base": "", + "fields": [ + {"name":"struct_4_field_1", "type":"string"} + ] + } + ], + "actions": [{ + "name": "action_name_1", + "type": "struct_name_1", + "ricardian_contract": "" + }], + "tables": [{ + "name": "table_name_1", + "index_type": "i64", + "key_names": [ + "key_name_1", + "key_name_2" + ], + "key_types": [ + "string", + "int" + ], + "type": "struct_name_1" + } + ] +} +` + +func assert_abi(t *testing.T, abi *eos.ABI) { + assert.Equal(t, abi.Version, "eosio::abi/1.0") + + // Types + assert.Equal(t, abi.Types[0].NewTypeName, "new_type_name_1") + assert.Equal(t, abi.Types[0].Type, "name") + + // Structs + assert.Equal(t, abi.Structs[0].Name, "struct_name_1") + assert.Equal(t, abi.Structs[0].Base, "struct_name_2") + assert.Equal(t, abi.Structs[0].Fields[0].Name, "struct_1_field_1") + assert.Equal(t, abi.Structs[0].Fields[0].Type, "new_type_name_1") + assert.Equal(t, abi.Structs[0].Fields[1].Name, "struct_1_field_2") + assert.Equal(t, abi.Structs[0].Fields[1].Type, "struct_name_3") + assert.Equal(t, abi.Structs[0].Fields[2].Name, "struct_1_field_3") + assert.Equal(t, abi.Structs[0].Fields[2].Type, "string?") + assert.Equal(t, abi.Structs[0].Fields[3].Name, "struct_1_field_4") + assert.Equal(t, abi.Structs[0].Fields[3].Type, "string?") + assert.Equal(t, abi.Structs[0].Fields[4].Name, "struct_1_field_5") + assert.Equal(t, abi.Structs[0].Fields[4].Type, "struct_name_4[]") + + assert.Equal(t, abi.Structs[1].Name, "struct_name_2") + assert.Equal(t, abi.Structs[1].Base, "") + assert.Equal(t, abi.Structs[1].Fields[0].Name, "struct_2_field_1") + assert.Equal(t, abi.Structs[1].Fields[0].Type, "string") + + assert.Equal(t, abi.Structs[2].Name, "struct_name_3") + assert.Equal(t, abi.Structs[2].Base, "") + assert.Equal(t, abi.Structs[2].Fields[0].Name, "struct_3_field_1") + assert.Equal(t, abi.Structs[2].Fields[0].Type, "string") + + assert.Equal(t, abi.Structs[3].Name, "struct_name_4") + assert.Equal(t, abi.Structs[3].Base, "") + assert.Equal(t, abi.Structs[3].Fields[0].Name, "struct_4_field_1") + assert.Equal(t, abi.Structs[3].Fields[0].Type, "string") + + // Actions + assert.Equal(t, abi.Actions[0].Name, eos.ActN("action_name_1")) + assert.Equal(t, abi.Actions[0].Type, "struct_name_1") + assert.Equal(t, abi.Actions[0].RicardianContract, "") + + // Tables + assert.Equal(t, abi.Tables[0].Name, eos.TableName("table_name_1")) + assert.Equal(t, abi.Tables[0].Type, "struct_name_1") + assert.Equal(t, abi.Tables[0].IndexType, "i64") + assert.Equal(t, abi.Tables[0].KeyNames[0], "key_name_1") + assert.Equal(t, abi.Tables[0].KeyNames[1], "key_name_2") + assert.Equal(t, abi.Tables[0].KeyTypes[0], "string") + assert.Equal(t, abi.Tables[0].KeyTypes[1], "int") +} + +func mockAPI(handler http.HandlerFunc) (*eos.API, *httptest.Server) { + server := httptest.NewServer(handler) + + return &eos.API{ + HttpClient: server.Client(), + BaseURL: strings.TrimRight(server.URL, "/"), + Compress: eos.CompressionZlib, + Header: make(http.Header), + }, server +} + +func TestManager_GetAbiFromCache(t *testing.T) { + cache := cache.NewCache("thalos::cache::abi::test", cache.NewMemoryStore()) + + api, _ := mockAPI(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + })) + + mgr := NewAbiManager(cache, api) + + abi, err := eos.NewABI(strings.NewReader(abiString)) + assert.NoError(t, err) + + err = mgr.SetAbi("testaccount", abi) + assert.NoError(t, err) + + c_abi, err := mgr.GetAbi("testaccount") + assert.NoError(t, err) + assert_abi(t, c_abi) +} + +func TestManager_GetAbiFromAPI(t *testing.T) { + cache := cache.NewCache("thalos::cache::abi::test", cache.NewMemoryStore()) + + api, _ := mockAPI(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body := fmt.Sprintf(`{"account_name": "testaccount", "abi": %s}`, abiString) + + _, err := w.Write([]byte(body)) + assert.NoError(t, err) + })) + + mgr := NewAbiManager(cache, api) + + c_abi, err := mgr.GetAbi("testaccount") + assert.NoError(t, err) + + assert_abi(t, c_abi) +} From 2acae14ff8f75cf252f0ca0a1e74b4a6a61011a2 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Wed, 1 Nov 2023 21:25:54 +0100 Subject: [PATCH 06/13] cmd/thalos/main.go: use new cache struct and abi manager. --- cmd/thalos/main.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/cmd/thalos/main.go b/cmd/thalos/main.go index 020681b..d590c02 100644 --- a/cmd/thalos/main.go +++ b/cmd/thalos/main.go @@ -21,8 +21,10 @@ import ( api_redis "github.com/eosswedenorg/thalos/api/redis" "github.com/eosswedenorg/thalos/app" "github.com/eosswedenorg/thalos/app/abi" + . "github.com/eosswedenorg/thalos/app/cache" "github.com/eosswedenorg/thalos/app/config" . "github.com/eosswedenorg/thalos/app/log" + redis_cache "github.com/go-redis/cache/v9" "github.com/nikoksr/notify" "github.com/nikoksr/notify/service/telegram" "github.com/pborman/getopt/v2" @@ -44,6 +46,10 @@ var VersionString string = "dev" var exit chan bool +var cache *Cache + +var cacheStore Store + func readerLoop(processor *app.ShipProcessor) { running = true recon_cnt := 0 @@ -171,6 +177,11 @@ func LogLevels() []string { return list } +func initAbiManger(api *eos.API, chain_id string) *abi.AbiManager { + cache := NewCache("thalos::cache::abi::"+chain_id, cacheStore) + return abi.NewAbiManager(cache, api) +} + func main() { var err error var chainInfo *eos.InfoResp @@ -281,6 +292,16 @@ func main() { return } + // Setup cache storage + cacheStore = NewRedisStore(&redis_cache.Options{ + Redis: rdb, + // Cache 10k keys for 10 minutes. + LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute), + }) + + // Setup general cache + cache = NewCache("thalos::cache::instance::"+conf.Name, cacheStore) + log.WithField("api", conf.Api).Info("Get chain info from api") eosClient := eos.New(conf.Api) chainInfo, err = eosClient.GetInfo(context.Background()) @@ -318,7 +339,7 @@ func main() { Prefix: conf.Redis.Prefix, ChainID: chain_id, }), - abi.NewAbiManager(rdb, eosClient, chain_id), + initAbiManger(eosClient, chain_id), codec, ) From 64459eca10b58ea6932102634a41c53747bfdf0d Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Wed, 1 Nov 2023 21:46:00 +0100 Subject: [PATCH 07/13] app/ship_processor.go: typo fix. --- app/ship_processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/ship_processor.go b/app/ship_processor.go index 7d2fe39..1e5abb1 100644 --- a/app/ship_processor.go +++ b/app/ship_processor.go @@ -274,7 +274,7 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { } } -// Close closes the writer assciated with the processor. +// Close closes the writer associated with the processor. func (processor *ShipProcessor) Close() error { return processor.writer.Close() } From 98a49538e2e515adac3c5762ee829cdecbfa3c27 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Wed, 1 Nov 2023 21:46:15 +0100 Subject: [PATCH 08/13] Adding app/state.go --- app/state.go | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 app/state.go diff --git a/app/state.go b/app/state.go new file mode 100644 index 0000000..b0c358e --- /dev/null +++ b/app/state.go @@ -0,0 +1,7 @@ +package app + +// State represents thalos runtime state +type State struct { + // Last processed block + CurrentBlock uint32 +} From 310f96219f09ba772fa5090077690dc3ec32092f Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Wed, 1 Nov 2023 21:47:21 +0100 Subject: [PATCH 09/13] app/ship_processor.go: remove current_block and use state.CurrentBlock instead. --- app/ship_processor.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/app/ship_processor.go b/app/ship_processor.go index 1e5abb1..c2fde70 100644 --- a/app/ship_processor.go +++ b/app/ship_processor.go @@ -43,8 +43,8 @@ type ShipProcessor struct { // Encoder used to encode messages encode message.Encoder - // Keep track of the current block we have processed. - current_block uint32 + // Internal state + state State // System contract ("eosio" per default) syscontract eos.AccountName @@ -53,12 +53,14 @@ type ShipProcessor struct { // SpawnProcessor creates a new ShipProccessor that consumes the shipclient.Stream passed to it. func SpawnProccessor(shipStream *shipclient.Stream, writer api.Writer, abi *abi.AbiManager, codec message.Codec) *ShipProcessor { processor := &ShipProcessor{ - abi: abi, - writer: writer, - shipStream: shipStream, - encode: logDecoratedEncoder(codec.Encoder), - syscontract: eos.AccountName("eosio"), - current_block: shipStream.StartBlock, + abi: abi, + writer: writer, + shipStream: shipStream, + encode: logDecoratedEncoder(codec.Encoder), + syscontract: eos.AccountName("eosio"), + state: State{ + CurrentBlock: shipStream.StartBlock, + }, } // Attach handlers @@ -124,15 +126,15 @@ func (processor *ShipProcessor) updateAbiFromAction(act *ship.Action) error { // Get the current block. func (processor *ShipProcessor) GetCurrentBlock() uint32 { - return processor.current_block + return processor.state.CurrentBlock } // Callback function called by shipclient.Stream when a new block arrives. func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { - processor.current_block = block.ThisBlock.BlockNum + processor.state.CurrentBlock = block.ThisBlock.BlockNum if block.ThisBlock.BlockNum%100 == 0 { - log.Infof("Current: %d, Head: %d", processor.current_block, block.Head.BlockNum) + log.Infof("Current: %d, Head: %d", processor.state.CurrentBlock, block.Head.BlockNum) } if block.ThisBlock.BlockNum%10 == 0 { From 1ae3da425c2e0f0e5b32db7d10a5510a3a2a652f Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Sat, 4 Nov 2023 13:21:30 +0100 Subject: [PATCH 10/13] app/state.go: Adding StateLoader and StateSaver function types. --- app/state.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/app/state.go b/app/state.go index b0c358e..2edb323 100644 --- a/app/state.go +++ b/app/state.go @@ -5,3 +5,11 @@ type State struct { // Last processed block CurrentBlock uint32 } + +type ( + // StateLoader is a function that loads a state. + StateLoader func(*State) + + // StateSaver is a function that saves a state. + StateSaver func(State) error +) From 9946bd59e11282ca23ebdc012364f70f6083e359 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Sat, 4 Nov 2023 13:24:15 +0100 Subject: [PATCH 11/13] app/ship_processor.go: implement StateLoader and StateSaver. --- app/ship_processor.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/app/ship_processor.go b/app/ship_processor.go index c2fde70..8db05a1 100644 --- a/app/ship_processor.go +++ b/app/ship_processor.go @@ -43,6 +43,9 @@ type ShipProcessor struct { // Encoder used to encode messages encode message.Encoder + // Function for saving state. + saver StateSaver + // Internal state state State @@ -51,18 +54,18 @@ type ShipProcessor struct { } // SpawnProcessor creates a new ShipProccessor that consumes the shipclient.Stream passed to it. -func SpawnProccessor(shipStream *shipclient.Stream, writer api.Writer, abi *abi.AbiManager, codec message.Codec) *ShipProcessor { +func SpawnProccessor(shipStream *shipclient.Stream, loader StateLoader, saver StateSaver, writer api.Writer, abi *abi.AbiManager, codec message.Codec) *ShipProcessor { processor := &ShipProcessor{ + saver: saver, abi: abi, writer: writer, shipStream: shipStream, encode: logDecoratedEncoder(codec.Encoder), syscontract: eos.AccountName("eosio"), - state: State{ - CurrentBlock: shipStream.StartBlock, - }, } + loader(&processor.state) + // Attach handlers shipStream.BlockHandler = processor.processBlock @@ -274,6 +277,11 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { if err != nil { log.WithError(err).Error("Failed to send messages") } + + err = processor.saver(processor.state) + if err != nil { + log.WithError(err).Error("Failed to save state") + } } // Close closes the writer associated with the processor. From fa48a79610abc3cc54b71e2dd10a847036251bed Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Sat, 4 Nov 2023 13:24:51 +0100 Subject: [PATCH 12/13] cmd/thalos/main.go: adding stateLoader and stateSaver functions for passing to processor. --- cmd/thalos/main.go | 48 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 8 deletions(-) diff --git a/cmd/thalos/main.go b/cmd/thalos/main.go index d590c02..df9c814 100644 --- a/cmd/thalos/main.go +++ b/cmd/thalos/main.go @@ -182,6 +182,44 @@ func initAbiManger(api *eos.API, chain_id string) *abi.AbiManager { return abi.NewAbiManager(cache, api) } +func stateLoader(chainInfo *eos.InfoResp) app.StateLoader { + return func(state *app.State) { + var source string + + // Load state from cache. + err := cache.Get("state", &state) + + // on error (cache miss) set current block from config/api + if err != nil { + // Set from config if we have a sane value. + if conf.Ship.StartBlockNum != shipclient.NULL_BLOCK_NUMBER { + source = "config" + state.CurrentBlock = conf.Ship.StartBlockNum + } else { + // Otherwise, set from api. + if conf.Ship.IrreversibleOnly { + source = "api (LIB)" + state.CurrentBlock = uint32(chainInfo.LastIrreversibleBlockNum) + } else { + source = "api (HEAD)" + state.CurrentBlock = uint32(chainInfo.HeadBlockNum) + } + } + } else { + source = "cache" + } + + log.WithFields(log.Fields{ + "block": state.CurrentBlock, + "source": source, + }).Info("Starting from block") + } +} + +func stateSaver(state app.State) error { + return cache.Set("state", state, 0) +} + func main() { var err error var chainInfo *eos.InfoResp @@ -310,14 +348,6 @@ func main() { return } - if conf.Ship.StartBlockNum == shipclient.NULL_BLOCK_NUMBER { - if conf.Ship.IrreversibleOnly { - conf.Ship.StartBlockNum = uint32(chainInfo.LastIrreversibleBlockNum) - } else { - conf.Ship.StartBlockNum = uint32(chainInfo.HeadBlockNum) - } - } - shClient = shipclient.NewStream(func(s *shipclient.Stream) { s.StartBlock = conf.Ship.StartBlockNum s.EndBlock = conf.Ship.EndBlockNum @@ -335,6 +365,8 @@ func main() { processor := app.SpawnProccessor( shClient, + stateLoader(chainInfo), + stateSaver, api_redis.NewPublisher(context.Background(), rdb, api_redis.Namespace{ Prefix: conf.Redis.Prefix, ChainID: chain_id, From 1daf38bd3d92cc5c8fa3f0b505b2ef96c7d2667d Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Sat, 4 Nov 2023 13:32:32 +0100 Subject: [PATCH 13/13] cmd/thalos/main.go: adding "-n" flag to force current block from config/api. --- cmd/thalos/main.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cmd/thalos/main.go b/cmd/thalos/main.go index df9c814..60d7de7 100644 --- a/cmd/thalos/main.go +++ b/cmd/thalos/main.go @@ -182,15 +182,16 @@ func initAbiManger(api *eos.API, chain_id string) *abi.AbiManager { return abi.NewAbiManager(cache, api) } -func stateLoader(chainInfo *eos.InfoResp) app.StateLoader { +func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) app.StateLoader { return func(state *app.State) { var source string // Load state from cache. err := cache.Get("state", &state) - // on error (cache miss) set current block from config/api - if err != nil { + // on error (cache miss) or if current_block_no_cache is set. + // set current block from config/api + if current_block_no_cache || err != nil { // Set from config if we have a sane value. if conf.Ship.StartBlockNum != shipclient.NULL_BLOCK_NUMBER { source = "config" @@ -232,6 +233,7 @@ func main() { pidFile := getopt.StringLong("pid", 'p', "", "Where to write process id", "file") logFile := getopt.StringLong("log", 'l', "", "Path to log file", "file") logLevel := getopt.EnumLong("level", 'L', LogLevels(), "info", "Log level to use") + skip_currentblock_cache := getopt.Bool('n', "Force the application to take start block from config/api") getopt.Parse() @@ -365,7 +367,7 @@ func main() { processor := app.SpawnProccessor( shClient, - stateLoader(chainInfo), + stateLoader(chainInfo, *skip_currentblock_cache), stateSaver, api_redis.NewPublisher(context.Background(), rdb, api_redis.Namespace{ Prefix: conf.Redis.Prefix,