diff --git a/app/abi/cache.go b/app/abi/cache.go deleted file mode 100644 index 3b54084..0000000 --- a/app/abi/cache.go +++ /dev/null @@ -1,46 +0,0 @@ -package abi - -import ( - "context" - "time" - - eos "github.com/eoscanada/eos-go" - redis_cache "github.com/go-redis/cache/v9" -) - -// Cache represents a abi cache in redis. -type Cache struct { - c *redis_cache.Cache - ctx context.Context - prefix string -} - -// Create a new cache -func NewCache(prefix string, options *redis_cache.Options) *Cache { - return &Cache{ - c: redis_cache.New(options), - ctx: context.Background(), - prefix: prefix, - } -} - -// Get an ABI from the cache using the contract account name as the key. -func (cache *Cache) Get(account string) (*eos.ABI, error) { - var v eos.ABI - err := cache.c.Get(cache.ctx, cache.key(account), &v) - return &v, err -} - -// Set an ABI in the cache. -func (cache *Cache) Set(account string, abi *eos.ABI, ttl time.Duration) error { - return cache.c.Set(&redis_cache.Item{ - Ctx: cache.ctx, - Key: cache.key(account), - Value: *abi, - TTL: ttl, - }) -} - -func (cache *Cache) key(account string) string { - return cache.prefix + "::" + account -} diff --git a/app/abi/cache_test.go b/app/abi/cache_test.go deleted file mode 100644 index da74e74..0000000 --- a/app/abi/cache_test.go +++ /dev/null @@ -1,158 +0,0 @@ -package abi - -import ( - "strings" - "testing" - "time" - - eos "github.com/eoscanada/eos-go" - redis_cache "github.com/go-redis/cache/v9" - "github.com/go-redis/redismock/v9" - - "github.com/stretchr/testify/assert" -) - -var abiString = ` -{ - "version": "eosio::abi/1.0", - "types": [{ - "new_type_name": "new_type_name_1", - "type": "name" - }], - "structs": [ - { - "name": "struct_name_1", - "base": "struct_name_2", - "fields": [ - {"name":"struct_1_field_1", "type":"new_type_name_1"}, - {"name":"struct_1_field_2", "type":"struct_name_3"}, - {"name":"struct_1_field_3", "type":"string?"}, - {"name":"struct_1_field_4", "type":"string?"}, - {"name":"struct_1_field_5", "type":"struct_name_4[]"} - ] - },{ - "name": "struct_name_2", - "base": "", - "fields": [ - {"name":"struct_2_field_1", "type":"string"} - ] - },{ - "name": "struct_name_3", - "base": "", - "fields": [ - {"name":"struct_3_field_1", "type":"string"} - ] - },{ - "name": "struct_name_4", - "base": "", - "fields": [ - {"name":"struct_4_field_1", "type":"string"} - ] - } - ], - "actions": [{ - "name": "action_name_1", - "type": "struct_name_1", - "ricardian_contract": "" - }], - "tables": [{ - "name": "table_name_1", - "index_type": "i64", - "key_names": [ - "key_name_1", - "key_name_2" - ], - "key_types": [ - "string", - "int" - ], - "type": "struct_name_1" - } - ] -} -` - -func TestGetSet(t *testing.T) { - client, mock := redismock.NewClientMock() - - c := NewCache("thalos::cache::test", &redis_cache.Options{ - Redis: client, - // Cache 10k keys for 1 minute. - LocalCache: redis_cache.NewTinyLFU(10000, time.Minute), - }) - - abi, err := eos.NewABI(strings.NewReader(abiString)) - assert.NoError(t, err) - - bytes, _ := c.c.Marshal(*abi) - - mock.ExpectSet("thalos::cache::test::testaccount", bytes, time.Minute).SetVal("OK") - - err = c.Set("testaccount", abi, time.Minute) - assert.NoError(t, err) - - c_abi, err := c.Get("testaccount") - assert.NoError(t, err) - - assert.Equal(t, c_abi.Version, "eosio::abi/1.0") - - // Types - assert.Equal(t, c_abi.Types[0].NewTypeName, "new_type_name_1") - assert.Equal(t, c_abi.Types[0].Type, "name") - - // Structs - assert.Equal(t, c_abi.Structs[0].Name, "struct_name_1") - assert.Equal(t, c_abi.Structs[0].Base, "struct_name_2") - assert.Equal(t, c_abi.Structs[0].Fields[0].Name, "struct_1_field_1") - assert.Equal(t, c_abi.Structs[0].Fields[0].Type, "new_type_name_1") - assert.Equal(t, c_abi.Structs[0].Fields[1].Name, "struct_1_field_2") - assert.Equal(t, c_abi.Structs[0].Fields[1].Type, "struct_name_3") - assert.Equal(t, c_abi.Structs[0].Fields[2].Name, "struct_1_field_3") - assert.Equal(t, c_abi.Structs[0].Fields[2].Type, "string?") - assert.Equal(t, c_abi.Structs[0].Fields[3].Name, "struct_1_field_4") - assert.Equal(t, c_abi.Structs[0].Fields[3].Type, "string?") - assert.Equal(t, c_abi.Structs[0].Fields[4].Name, "struct_1_field_5") - assert.Equal(t, c_abi.Structs[0].Fields[4].Type, "struct_name_4[]") - - assert.Equal(t, c_abi.Structs[1].Name, "struct_name_2") - assert.Equal(t, c_abi.Structs[1].Base, "") - assert.Equal(t, c_abi.Structs[1].Fields[0].Name, "struct_2_field_1") - assert.Equal(t, c_abi.Structs[1].Fields[0].Type, "string") - - assert.Equal(t, c_abi.Structs[2].Name, "struct_name_3") - assert.Equal(t, c_abi.Structs[2].Base, "") - assert.Equal(t, c_abi.Structs[2].Fields[0].Name, "struct_3_field_1") - assert.Equal(t, c_abi.Structs[2].Fields[0].Type, "string") - - assert.Equal(t, c_abi.Structs[3].Name, "struct_name_4") - assert.Equal(t, c_abi.Structs[3].Base, "") - assert.Equal(t, c_abi.Structs[3].Fields[0].Name, "struct_4_field_1") - assert.Equal(t, c_abi.Structs[3].Fields[0].Type, "string") - - // Actions - assert.Equal(t, c_abi.Actions[0].Name, eos.ActN("action_name_1")) - assert.Equal(t, c_abi.Actions[0].Type, "struct_name_1") - assert.Equal(t, c_abi.Actions[0].RicardianContract, "") - - // Tables - assert.Equal(t, c_abi.Tables[0].Name, eos.TableName("table_name_1")) - assert.Equal(t, c_abi.Tables[0].Type, "struct_name_1") - assert.Equal(t, c_abi.Tables[0].IndexType, "i64") - assert.Equal(t, c_abi.Tables[0].KeyNames[0], "key_name_1") - assert.Equal(t, c_abi.Tables[0].KeyNames[1], "key_name_2") - assert.Equal(t, c_abi.Tables[0].KeyTypes[0], "string") - assert.Equal(t, c_abi.Tables[0].KeyTypes[1], "int") -} - -func TestCacheMiss(t *testing.T) { - client, _ := redismock.NewClientMock() - - c := NewCache("thalos::cache::test", &redis_cache.Options{ - Redis: client, - // Cache 10k keys for 1 minute. - LocalCache: redis_cache.NewTinyLFU(10000, time.Minute), - }) - - _, err := c.Get("nonexist") - assert.Error(t, err) -} diff --git a/app/abi/manager.go b/app/abi/manager.go index 09e9256..ba0fa5a 100644 --- a/app/abi/manager.go +++ b/app/abi/manager.go @@ -6,26 +6,18 @@ import ( "time" eos "github.com/eoscanada/eos-go" - redis_cache "github.com/go-redis/cache/v9" - "github.com/redis/go-redis/v9" + "github.com/eosswedenorg/thalos/app/cache" ) // AbiManager handles an ABI cache that fetches the ABI from an API on cache miss. type AbiManager struct { - cache *Cache + cache *cache.Cache api *eos.API ctx context.Context } // Create a new ABI Manager -func NewAbiManager(rdb *redis.Client, api *eos.API, id string) *AbiManager { - // Init abi cache - cache := NewCache("thalos::cache::"+id+"::abi", &redis_cache.Options{ - Redis: rdb, - // Cache 10k keys for 10 minutes. - LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute), - }) - +func NewAbiManager(cache *cache.Cache, api *eos.API) *AbiManager { return &AbiManager{ cache: cache, api: api, @@ -35,28 +27,26 @@ func NewAbiManager(rdb *redis.Client, api *eos.API, id string) *AbiManager { // Set or update an ABI in the cache. func (mgr *AbiManager) SetAbi(account eos.AccountName, abi *eos.ABI) error { - return mgr.cache.Set(string(account), abi, time.Hour) + return mgr.cache.Set(string(account), *abi, time.Hour) } // Get an ABI from the cache, on cache miss it is fetched from the // API, gets cached and then returned to the user func (mgr *AbiManager) GetAbi(account eos.AccountName) (*eos.ABI, error) { - key := string(account) - - abi, err := mgr.cache.Get(key) - if err != nil { + var abi eos.ABI + if err := mgr.cache.Get(string(account), &abi); err != nil { ctx, cancel := context.WithTimeout(mgr.ctx, time.Second) defer cancel() resp, err := mgr.api.GetABI(ctx, account) if err != nil { return nil, fmt.Errorf("api: %s", err) } - abi = &resp.ABI + abi = resp.ABI - err = mgr.SetAbi(account, abi) + err = mgr.SetAbi(account, &abi) if err != nil { return nil, fmt.Errorf("cache: %s", err) } } - return abi, nil + return &abi, nil } diff --git a/app/abi/manager_test.go b/app/abi/manager_test.go new file mode 100644 index 0000000..661c8ec --- /dev/null +++ b/app/abi/manager_test.go @@ -0,0 +1,173 @@ +package abi + +import ( + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + + eos "github.com/eoscanada/eos-go" + + "github.com/eosswedenorg/thalos/app/cache" + "github.com/stretchr/testify/assert" +) + +var abiString = ` +{ + "version": "eosio::abi/1.0", + "types": [{ + "new_type_name": "new_type_name_1", + "type": "name" + }], + "structs": [ + { + "name": "struct_name_1", + "base": "struct_name_2", + "fields": [ + {"name":"struct_1_field_1", "type":"new_type_name_1"}, + {"name":"struct_1_field_2", "type":"struct_name_3"}, + {"name":"struct_1_field_3", "type":"string?"}, + {"name":"struct_1_field_4", "type":"string?"}, + {"name":"struct_1_field_5", "type":"struct_name_4[]"} + ] + },{ + "name": "struct_name_2", + "base": "", + "fields": [ + {"name":"struct_2_field_1", "type":"string"} + ] + },{ + "name": "struct_name_3", + "base": "", + "fields": [ + {"name":"struct_3_field_1", "type":"string"} + ] + },{ + "name": "struct_name_4", + "base": "", + "fields": [ + {"name":"struct_4_field_1", "type":"string"} + ] + } + ], + "actions": [{ + "name": "action_name_1", + "type": "struct_name_1", + "ricardian_contract": "" + }], + "tables": [{ + "name": "table_name_1", + "index_type": "i64", + "key_names": [ + "key_name_1", + "key_name_2" + ], + "key_types": [ + "string", + "int" + ], + "type": "struct_name_1" + } + ] +} +` + +func assert_abi(t *testing.T, abi *eos.ABI) { + assert.Equal(t, abi.Version, "eosio::abi/1.0") + + // Types + assert.Equal(t, abi.Types[0].NewTypeName, "new_type_name_1") + assert.Equal(t, abi.Types[0].Type, "name") + + // Structs + assert.Equal(t, abi.Structs[0].Name, "struct_name_1") + assert.Equal(t, abi.Structs[0].Base, "struct_name_2") + assert.Equal(t, abi.Structs[0].Fields[0].Name, "struct_1_field_1") + assert.Equal(t, abi.Structs[0].Fields[0].Type, "new_type_name_1") + assert.Equal(t, abi.Structs[0].Fields[1].Name, "struct_1_field_2") + assert.Equal(t, abi.Structs[0].Fields[1].Type, "struct_name_3") + assert.Equal(t, abi.Structs[0].Fields[2].Name, "struct_1_field_3") + assert.Equal(t, abi.Structs[0].Fields[2].Type, "string?") + assert.Equal(t, abi.Structs[0].Fields[3].Name, "struct_1_field_4") + assert.Equal(t, abi.Structs[0].Fields[3].Type, "string?") + assert.Equal(t, abi.Structs[0].Fields[4].Name, "struct_1_field_5") + assert.Equal(t, abi.Structs[0].Fields[4].Type, "struct_name_4[]") + + assert.Equal(t, abi.Structs[1].Name, "struct_name_2") + assert.Equal(t, abi.Structs[1].Base, "") + assert.Equal(t, abi.Structs[1].Fields[0].Name, "struct_2_field_1") + assert.Equal(t, abi.Structs[1].Fields[0].Type, "string") + + assert.Equal(t, abi.Structs[2].Name, "struct_name_3") + assert.Equal(t, abi.Structs[2].Base, "") + assert.Equal(t, abi.Structs[2].Fields[0].Name, "struct_3_field_1") + assert.Equal(t, abi.Structs[2].Fields[0].Type, "string") + + assert.Equal(t, abi.Structs[3].Name, "struct_name_4") + assert.Equal(t, abi.Structs[3].Base, "") + assert.Equal(t, abi.Structs[3].Fields[0].Name, "struct_4_field_1") + assert.Equal(t, abi.Structs[3].Fields[0].Type, "string") + + // Actions + assert.Equal(t, abi.Actions[0].Name, eos.ActN("action_name_1")) + assert.Equal(t, abi.Actions[0].Type, "struct_name_1") + assert.Equal(t, abi.Actions[0].RicardianContract, "") + + // Tables + assert.Equal(t, abi.Tables[0].Name, eos.TableName("table_name_1")) + assert.Equal(t, abi.Tables[0].Type, "struct_name_1") + assert.Equal(t, abi.Tables[0].IndexType, "i64") + assert.Equal(t, abi.Tables[0].KeyNames[0], "key_name_1") + assert.Equal(t, abi.Tables[0].KeyNames[1], "key_name_2") + assert.Equal(t, abi.Tables[0].KeyTypes[0], "string") + assert.Equal(t, abi.Tables[0].KeyTypes[1], "int") +} + +func mockAPI(handler http.HandlerFunc) (*eos.API, *httptest.Server) { + server := httptest.NewServer(handler) + + return &eos.API{ + HttpClient: server.Client(), + BaseURL: strings.TrimRight(server.URL, "/"), + Compress: eos.CompressionZlib, + Header: make(http.Header), + }, server +} + +func TestManager_GetAbiFromCache(t *testing.T) { + cache := cache.NewCache("thalos::cache::abi::test", cache.NewMemoryStore()) + + api, _ := mockAPI(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + })) + + mgr := NewAbiManager(cache, api) + + abi, err := eos.NewABI(strings.NewReader(abiString)) + assert.NoError(t, err) + + err = mgr.SetAbi("testaccount", abi) + assert.NoError(t, err) + + c_abi, err := mgr.GetAbi("testaccount") + assert.NoError(t, err) + assert_abi(t, c_abi) +} + +func TestManager_GetAbiFromAPI(t *testing.T) { + cache := cache.NewCache("thalos::cache::abi::test", cache.NewMemoryStore()) + + api, _ := mockAPI(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body := fmt.Sprintf(`{"account_name": "testaccount", "abi": %s}`, abiString) + + _, err := w.Write([]byte(body)) + assert.NoError(t, err) + })) + + mgr := NewAbiManager(cache, api) + + c_abi, err := mgr.GetAbi("testaccount") + assert.NoError(t, err) + + assert_abi(t, c_abi) +} diff --git a/app/cache/cache.go b/app/cache/cache.go new file mode 100644 index 0000000..290019b --- /dev/null +++ b/app/cache/cache.go @@ -0,0 +1,30 @@ +package cache + +import ( + "time" +) + +type Cache struct { + store Store + prefix string +} + +// Create a new cache +func NewCache(prefix string, store Store) *Cache { + return &Cache{ + store: store, + prefix: prefix, + } +} + +func (cache *Cache) Get(key string, value any) error { + return cache.store.Get(cache.key(key), value) +} + +func (cache *Cache) Set(key string, value any, ttl time.Duration) error { + return cache.store.Set(cache.key(key), value, ttl) +} + +func (cache *Cache) key(key string) string { + return cache.prefix + "::" + key +} diff --git a/app/cache/memory_store.go b/app/cache/memory_store.go new file mode 100644 index 0000000..7c3bd9f --- /dev/null +++ b/app/cache/memory_store.go @@ -0,0 +1,55 @@ +package cache + +import ( + "fmt" + "reflect" + "time" +) + +var now = time.Now + +type memoryStoreItem struct { + value any + expired time.Time +} + +type MemoryStore struct { + data map[string]memoryStoreItem +} + +func NewMemoryStore() *MemoryStore { + return &MemoryStore{make(map[string]memoryStoreItem)} +} + +func (s *MemoryStore) Get(key string, value any) error { + if item, ok := s.data[key]; ok { + + if item.expired.Before(now()) { + delete(s.data, key) + return fmt.Errorf("key: %s does not exist", key) + } + + v := reflect.ValueOf(value) + if v.Kind() != reflect.Pointer { + return fmt.Errorf("value must be of pointer type, '%s' passed", v.Kind().String()) + } + + v.Elem().Set(reflect.ValueOf(item.value)) + + return nil + } + return fmt.Errorf("key: %s does not exist", key) +} + +func (s *MemoryStore) Has(key string) bool { + _, hit := s.data[key] + return hit +} + +func (s *MemoryStore) Set(key string, value any, ttl time.Duration) error { + s.data[key] = memoryStoreItem{ + value: value, + expired: now().Add(ttl), + } + return nil +} diff --git a/app/cache/memory_store_test.go b/app/cache/memory_store_test.go new file mode 100644 index 0000000..d6bc6bd --- /dev/null +++ b/app/cache/memory_store_test.go @@ -0,0 +1,87 @@ +package cache + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type memoryTestItem struct { + String string + Num uint32 + Float float32 +} + +func TestMemoryStore_Set(t *testing.T) { + now = func() time.Time { return time.Unix(1581315270, 0) } + + item := memoryTestItem{ + String: "MyString", + Num: 23, + Float: 3.14, + } + + expected := map[string]memoryStoreItem{ + "key1": { + value: item, + expired: now().Add(time.Hour), + }, + } + + store := NewMemoryStore() + err := store.Set("key1", item, time.Hour) + assert.NoError(t, err) + + assert.Equal(t, expected, store.data) +} + +func TestMemoryStore_GetMiss(t *testing.T) { + store := NewMemoryStore() + + var v any + err := store.Get("Key2", &v) + assert.Error(t, err) +} + +func TestMemoryStore_GetHit(t *testing.T) { + expected := memoryTestItem{ + String: "MyString", + Num: 23, + Float: 3.14, + } + + store := NewMemoryStore() + err := store.Set("key1", expected, time.Hour) + assert.NoError(t, err) + + var actual memoryTestItem + err = store.Get("key1", &actual) + assert.NoError(t, err) + assert.Equal(t, expected, actual) +} + +func TestMemoryStore_GetNonPointer(t *testing.T) { + expected := memoryTestItem{ + String: "MyString", + Num: 23, + Float: 3.14, + } + + store := NewMemoryStore() + err := store.Set("key1", expected, time.Hour) + assert.NoError(t, err) + + var actual string + err = store.Get("key1", actual) + assert.EqualError(t, err, "value must be of pointer type, 'string' passed") +} + +func TestMemoryStore_Has(t *testing.T) { + store := NewMemoryStore() + err := store.Set("key1", "value", time.Hour) + assert.NoError(t, err) + + assert.True(t, store.Has("key1")) + assert.False(t, store.Has("key2")) +} diff --git a/app/cache/redis_store.go b/app/cache/redis_store.go new file mode 100644 index 0000000..5016a15 --- /dev/null +++ b/app/cache/redis_store.go @@ -0,0 +1,37 @@ +package cache + +import ( + "context" + "time" + + "github.com/go-redis/cache/v9" +) + +type RedisStore struct { + c *cache.Cache + ctx context.Context +} + +func NewRedisStore(options *cache.Options) *RedisStore { + return &RedisStore{ + c: cache.New(options), + ctx: context.Background(), + } +} + +func (s *RedisStore) Get(key string, value interface{}) error { + return s.c.Get(s.ctx, key, value) +} + +func (s *RedisStore) Has(key string) bool { + return s.c.Exists(s.ctx, key) +} + +func (s *RedisStore) Set(key string, value any, ttl time.Duration) error { + return s.c.Set(&cache.Item{ + Ctx: s.ctx, + Key: key, + Value: value, + TTL: ttl, + }) +} diff --git a/app/cache/redis_store_test.go b/app/cache/redis_store_test.go new file mode 100644 index 0000000..34e3f34 --- /dev/null +++ b/app/cache/redis_store_test.go @@ -0,0 +1,103 @@ +package cache + +import ( + "testing" + "time" + + "github.com/go-redis/redismock/v9" + + redis_cache "github.com/go-redis/cache/v9" + "github.com/stretchr/testify/assert" +) + +type testItem struct { + Num uint32 + Name string +} + +func TestRedisStore_Set(t *testing.T) { + client, mock := redismock.NewClientMock() + + store := NewRedisStore(&redis_cache.Options{ + Redis: client, + }) + + expected := testItem{ + Num: 24, + Name: "Some Name", + } + + bytes, err := store.c.Marshal(expected) + assert.NoError(t, err) + + mock.ExpectSet("mykey", bytes, time.Minute).SetVal("OK") + + err = store.Set("mykey", expected, time.Minute) + assert.NoError(t, err) + + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestRedisStore_GetMiss(t *testing.T) { + client, mock := redismock.NewClientMock() + + store := NewRedisStore(&redis_cache.Options{ + Redis: client, + }) + + mock.ExpectGet("mykey").SetErr(redis_cache.ErrCacheMiss) + + expected := testItem{} + err := store.Get("mykey", &expected) + assert.ErrorIs(t, err, redis_cache.ErrCacheMiss) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestRedisStore_GetHit(t *testing.T) { + client, mock := redismock.NewClientMock() + + store := NewRedisStore(&redis_cache.Options{ + Redis: client, + }) + + expected := testItem{ + Num: 42, + Name: "MyName", + } + + bytes, err := store.c.Marshal(expected) + assert.NoError(t, err) + + mock.ExpectSet("mykey2", bytes, time.Second*20).SetVal("OK") + mock.ExpectGet("mykey2").SetVal(string(bytes)) + + err = store.Set("mykey2", expected, time.Second*20) + assert.NoError(t, err) + + actual := testItem{} + err = store.Get("mykey2", &actual) + assert.NoError(t, err) + assert.Equal(t, expected, actual) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestRedisStore_Has(t *testing.T) { + client, mock := redismock.NewClientMock() + + store := NewRedisStore(&redis_cache.Options{ + Redis: client, + }) + + bytes, err := store.c.Marshal("value") + assert.NoError(t, err) + + mock.ExpectSet("key1", bytes, time.Minute*15).SetVal("OK") + mock.ExpectGet("key1").SetVal(string(bytes)) + mock.ExpectGet("key2").RedisNil() + + err = store.Set("key1", "value", time.Minute*15) + assert.NoError(t, err) + assert.True(t, store.Has("key1")) + assert.False(t, store.Has("key2")) + assert.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/app/cache/store.go b/app/cache/store.go new file mode 100644 index 0000000..309333f --- /dev/null +++ b/app/cache/store.go @@ -0,0 +1,15 @@ +package cache + +import "time" + +type Store interface { + // Set an item in the store. + Set(key string, value any, TTL time.Duration) error + + // Get an item from the store. + // returns an error if key is not found or there is other problems. + Get(key string, value any) error + + // Check if a key exist in the store. + Has(key string) bool +} diff --git a/app/ship_processor.go b/app/ship_processor.go index a0d8fc4..6f88894 100644 --- a/app/ship_processor.go +++ b/app/ship_processor.go @@ -44,24 +44,29 @@ type ShipProcessor struct { // Encoder used to encode messages encode message.Encoder - // Keep track of the current block we have processed. - current_block uint32 + // Function for saving state. + saver StateSaver + + // Internal state + state State // System contract ("eosio" per default) syscontract eos.AccountName } // SpawnProcessor creates a new ShipProccessor that consumes the shipclient.Stream passed to it. -func SpawnProccessor(shipStream *shipclient.Stream, writer driver.Writer, abi *abi.AbiManager, codec message.Codec) *ShipProcessor { +func SpawnProccessor(shipStream *shipclient.Stream, loader StateLoader, saver StateSaver, writer driver.Writer, abi *abi.AbiManager, codec message.Codec) *ShipProcessor { processor := &ShipProcessor{ - abi: abi, - writer: writer, - shipStream: shipStream, - encode: logDecoratedEncoder(codec.Encoder), - syscontract: eos.AccountName("eosio"), - current_block: shipStream.StartBlock, + saver: saver, + abi: abi, + writer: writer, + shipStream: shipStream, + encode: logDecoratedEncoder(codec.Encoder), + syscontract: eos.AccountName("eosio"), } + loader(&processor.state) + // Attach handlers shipStream.BlockHandler = processor.processBlock @@ -125,15 +130,15 @@ func (processor *ShipProcessor) updateAbiFromAction(act *ship.Action) error { // Get the current block. func (processor *ShipProcessor) GetCurrentBlock() uint32 { - return processor.current_block + return processor.state.CurrentBlock } // Callback function called by shipclient.Stream when a new block arrives. func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { - processor.current_block = block.ThisBlock.BlockNum + processor.state.CurrentBlock = block.ThisBlock.BlockNum if block.ThisBlock.BlockNum%100 == 0 { - log.Infof("Current: %d, Head: %d", processor.current_block, block.Head.BlockNum) + log.Infof("Current: %d, Head: %d", processor.state.CurrentBlock, block.Head.BlockNum) } if block.ThisBlock.BlockNum%10 == 0 { @@ -273,9 +278,14 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) { if err != nil { log.WithError(err).Error("Failed to send messages") } + + err = processor.saver(processor.state) + if err != nil { + log.WithError(err).Error("Failed to save state") + } } -// Close closes the writer assciated with the processor. +// Close closes the writer associated with the processor. func (processor *ShipProcessor) Close() error { return processor.writer.Close() } diff --git a/app/state.go b/app/state.go new file mode 100644 index 0000000..2edb323 --- /dev/null +++ b/app/state.go @@ -0,0 +1,15 @@ +package app + +// State represents thalos runtime state +type State struct { + // Last processed block + CurrentBlock uint32 +} + +type ( + // StateLoader is a function that loads a state. + StateLoader func(*State) + + // StateSaver is a function that saves a state. + StateSaver func(State) error +) diff --git a/cmd/thalos/main.go b/cmd/thalos/main.go index 4278365..ef41f25 100644 --- a/cmd/thalos/main.go +++ b/cmd/thalos/main.go @@ -21,9 +21,11 @@ import ( api_redis "github.com/eosswedenorg/thalos/api/redis" "github.com/eosswedenorg/thalos/app" "github.com/eosswedenorg/thalos/app/abi" + . "github.com/eosswedenorg/thalos/app/cache" "github.com/eosswedenorg/thalos/app/config" driver "github.com/eosswedenorg/thalos/app/driver/redis" . "github.com/eosswedenorg/thalos/app/log" + redis_cache "github.com/go-redis/cache/v9" "github.com/nikoksr/notify" "github.com/nikoksr/notify/service/telegram" "github.com/pborman/getopt/v2" @@ -45,6 +47,10 @@ var VersionString string = "dev" var exit chan bool +var cache *Cache + +var cacheStore Store + func readerLoop(processor *app.ShipProcessor) { recon_cnt := 0 @@ -171,6 +177,50 @@ func LogLevels() []string { return list } +func initAbiManger(api *eos.API, chain_id string) *abi.AbiManager { + cache := NewCache("thalos::cache::abi::"+chain_id, cacheStore) + return abi.NewAbiManager(cache, api) +} + +func stateLoader(chainInfo *eos.InfoResp, current_block_no_cache bool) app.StateLoader { + return func(state *app.State) { + var source string + + // Load state from cache. + err := cache.Get("state", &state) + + // on error (cache miss) or if current_block_no_cache is set. + // set current block from config/api + if current_block_no_cache || err != nil { + // Set from config if we have a sane value. + if conf.Ship.StartBlockNum != shipclient.NULL_BLOCK_NUMBER { + source = "config" + state.CurrentBlock = conf.Ship.StartBlockNum + } else { + // Otherwise, set from api. + if conf.Ship.IrreversibleOnly { + source = "api (LIB)" + state.CurrentBlock = uint32(chainInfo.LastIrreversibleBlockNum) + } else { + source = "api (HEAD)" + state.CurrentBlock = uint32(chainInfo.HeadBlockNum) + } + } + } else { + source = "cache" + } + + log.WithFields(log.Fields{ + "block": state.CurrentBlock, + "source": source, + }).Info("Starting from block") + } +} + +func stateSaver(state app.State) error { + return cache.Set("state", state, 0) +} + func main() { var err error var chainInfo *eos.InfoResp @@ -183,6 +233,7 @@ func main() { pidFile := getopt.StringLong("pid", 'p', "", "Where to write process id", "file") logFile := getopt.StringLong("log", 'l', "", "Path to log file", "file") logLevel := getopt.EnumLong("level", 'L', LogLevels(), "info", "Log level to use") + skip_currentblock_cache := getopt.Bool('n', "Force the application to take start block from config/api") getopt.Parse() @@ -281,6 +332,16 @@ func main() { return } + // Setup cache storage + cacheStore = NewRedisStore(&redis_cache.Options{ + Redis: rdb, + // Cache 10k keys for 10 minutes. + LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute), + }) + + // Setup general cache + cache = NewCache("thalos::cache::instance::"+conf.Name, cacheStore) + log.WithField("api", conf.Api).Info("Get chain info from api") eosClient := eos.New(conf.Api) chainInfo, err = eosClient.GetInfo(context.Background()) @@ -289,14 +350,6 @@ func main() { return } - if conf.Ship.StartBlockNum == shipclient.NULL_BLOCK_NUMBER { - if conf.Ship.IrreversibleOnly { - conf.Ship.StartBlockNum = uint32(chainInfo.LastIrreversibleBlockNum) - } else { - conf.Ship.StartBlockNum = uint32(chainInfo.HeadBlockNum) - } - } - shClient = shipclient.NewStream(func(s *shipclient.Stream) { s.StartBlock = conf.Ship.StartBlockNum s.EndBlock = conf.Ship.EndBlockNum @@ -314,11 +367,13 @@ func main() { processor := app.SpawnProccessor( shClient, + stateLoader(chainInfo, *skip_currentblock_cache), + stateSaver, driver.NewPublisher(context.Background(), rdb, api_redis.Namespace{ Prefix: conf.Redis.Prefix, ChainID: chain_id, }), - abi.NewAbiManager(rdb, eosClient, chain_id), + initAbiManger(eosClient, chain_id), codec, )