1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-23 10:13:42 +02:00

rename app folder to internal.

This commit is contained in:
Henrik Hautakoski 2024-02-14 13:00:33 +01:00
parent afb90af1db
commit 9974bfe3fd
28 changed files with 23 additions and 23 deletions

60
internal/abi/manager.go Normal file
View file

@ -0,0 +1,60 @@
package abi
import (
"context"
"fmt"
"time"
eos "github.com/eoscanada/eos-go"
"github.com/eosswedenorg/thalos/internal/cache"
)
// AbiManager handles an ABI cache that fetches the ABI from an API on cache miss.
type AbiManager struct {
cache *cache.Cache
api *eos.API
ctx context.Context
}
// Create a new ABI Manager
func NewAbiManager(cache *cache.Cache, api *eos.API) *AbiManager {
return &AbiManager{
cache: cache,
api: api,
ctx: context.Background(),
}
}
// Set or update an ABI in the cache.
func (mgr *AbiManager) SetAbi(account eos.AccountName, abi *eos.ABI) error {
ctx, cancel := context.WithTimeout(mgr.ctx, time.Millisecond*500)
defer cancel()
return mgr.cache.Set(ctx, string(account), *abi, time.Hour)
}
// Get an ABI from the cache, on cache miss it is fetched from the
// API, gets cached and then returned to the user
func (mgr *AbiManager) GetAbi(account eos.AccountName) (*eos.ABI, error) {
var abi eos.ABI
if err := mgr.cacheGet(account, &abi); err != nil {
ctx, cancel := context.WithTimeout(mgr.ctx, time.Second)
defer cancel()
resp, err := mgr.api.GetABI(ctx, account)
if err != nil {
return nil, fmt.Errorf("api: %s", err)
}
abi = resp.ABI
err = mgr.SetAbi(account, &abi)
if err != nil {
return nil, fmt.Errorf("cache: %s", err)
}
}
return &abi, nil
}
func (mgr *AbiManager) cacheGet(account eos.AccountName, value any) error {
ctx, cancel := context.WithTimeout(mgr.ctx, time.Millisecond*500)
defer cancel()
return mgr.cache.Get(ctx, string(account), value)
}

View file

@ -0,0 +1,173 @@
package abi
import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
eos "github.com/eoscanada/eos-go"
"github.com/eosswedenorg/thalos/internal/cache"
"github.com/stretchr/testify/assert"
)
var abiString = `
{
"version": "eosio::abi/1.0",
"types": [{
"new_type_name": "new_type_name_1",
"type": "name"
}],
"structs": [
{
"name": "struct_name_1",
"base": "struct_name_2",
"fields": [
{"name":"struct_1_field_1", "type":"new_type_name_1"},
{"name":"struct_1_field_2", "type":"struct_name_3"},
{"name":"struct_1_field_3", "type":"string?"},
{"name":"struct_1_field_4", "type":"string?"},
{"name":"struct_1_field_5", "type":"struct_name_4[]"}
]
},{
"name": "struct_name_2",
"base": "",
"fields": [
{"name":"struct_2_field_1", "type":"string"}
]
},{
"name": "struct_name_3",
"base": "",
"fields": [
{"name":"struct_3_field_1", "type":"string"}
]
},{
"name": "struct_name_4",
"base": "",
"fields": [
{"name":"struct_4_field_1", "type":"string"}
]
}
],
"actions": [{
"name": "action_name_1",
"type": "struct_name_1",
"ricardian_contract": ""
}],
"tables": [{
"name": "table_name_1",
"index_type": "i64",
"key_names": [
"key_name_1",
"key_name_2"
],
"key_types": [
"string",
"int"
],
"type": "struct_name_1"
}
]
}
`
func assert_abi(t *testing.T, abi *eos.ABI) {
assert.Equal(t, abi.Version, "eosio::abi/1.0")
// Types
assert.Equal(t, abi.Types[0].NewTypeName, "new_type_name_1")
assert.Equal(t, abi.Types[0].Type, "name")
// Structs
assert.Equal(t, abi.Structs[0].Name, "struct_name_1")
assert.Equal(t, abi.Structs[0].Base, "struct_name_2")
assert.Equal(t, abi.Structs[0].Fields[0].Name, "struct_1_field_1")
assert.Equal(t, abi.Structs[0].Fields[0].Type, "new_type_name_1")
assert.Equal(t, abi.Structs[0].Fields[1].Name, "struct_1_field_2")
assert.Equal(t, abi.Structs[0].Fields[1].Type, "struct_name_3")
assert.Equal(t, abi.Structs[0].Fields[2].Name, "struct_1_field_3")
assert.Equal(t, abi.Structs[0].Fields[2].Type, "string?")
assert.Equal(t, abi.Structs[0].Fields[3].Name, "struct_1_field_4")
assert.Equal(t, abi.Structs[0].Fields[3].Type, "string?")
assert.Equal(t, abi.Structs[0].Fields[4].Name, "struct_1_field_5")
assert.Equal(t, abi.Structs[0].Fields[4].Type, "struct_name_4[]")
assert.Equal(t, abi.Structs[1].Name, "struct_name_2")
assert.Equal(t, abi.Structs[1].Base, "")
assert.Equal(t, abi.Structs[1].Fields[0].Name, "struct_2_field_1")
assert.Equal(t, abi.Structs[1].Fields[0].Type, "string")
assert.Equal(t, abi.Structs[2].Name, "struct_name_3")
assert.Equal(t, abi.Structs[2].Base, "")
assert.Equal(t, abi.Structs[2].Fields[0].Name, "struct_3_field_1")
assert.Equal(t, abi.Structs[2].Fields[0].Type, "string")
assert.Equal(t, abi.Structs[3].Name, "struct_name_4")
assert.Equal(t, abi.Structs[3].Base, "")
assert.Equal(t, abi.Structs[3].Fields[0].Name, "struct_4_field_1")
assert.Equal(t, abi.Structs[3].Fields[0].Type, "string")
// Actions
assert.Equal(t, abi.Actions[0].Name, eos.ActN("action_name_1"))
assert.Equal(t, abi.Actions[0].Type, "struct_name_1")
assert.Equal(t, abi.Actions[0].RicardianContract, "")
// Tables
assert.Equal(t, abi.Tables[0].Name, eos.TableName("table_name_1"))
assert.Equal(t, abi.Tables[0].Type, "struct_name_1")
assert.Equal(t, abi.Tables[0].IndexType, "i64")
assert.Equal(t, abi.Tables[0].KeyNames[0], "key_name_1")
assert.Equal(t, abi.Tables[0].KeyNames[1], "key_name_2")
assert.Equal(t, abi.Tables[0].KeyTypes[0], "string")
assert.Equal(t, abi.Tables[0].KeyTypes[1], "int")
}
func mockAPI(handler http.HandlerFunc) (*eos.API, *httptest.Server) {
server := httptest.NewServer(handler)
return &eos.API{
HttpClient: server.Client(),
BaseURL: strings.TrimRight(server.URL, "/"),
Compress: eos.CompressionZlib,
Header: make(http.Header),
}, server
}
func TestManager_GetAbiFromCache(t *testing.T) {
cache := cache.NewCache("thalos::cache::abi::test", cache.NewMemoryStore())
api, _ := mockAPI(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
}))
mgr := NewAbiManager(cache, api)
abi, err := eos.NewABI(strings.NewReader(abiString))
assert.NoError(t, err)
err = mgr.SetAbi("testaccount", abi)
assert.NoError(t, err)
c_abi, err := mgr.GetAbi("testaccount")
assert.NoError(t, err)
assert_abi(t, c_abi)
}
func TestManager_GetAbiFromAPI(t *testing.T) {
cache := cache.NewCache("thalos::cache::abi::test", cache.NewMemoryStore())
api, _ := mockAPI(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body := fmt.Sprintf(`{"account_name": "testaccount", "abi": %s}`, abiString)
_, err := w.Write([]byte(body))
assert.NoError(t, err)
}))
mgr := NewAbiManager(cache, api)
c_abi, err := mgr.GetAbi("testaccount")
assert.NoError(t, err)
assert_abi(t, c_abi)
}

31
internal/cache/cache.go vendored Normal file
View file

@ -0,0 +1,31 @@
package cache
import (
"context"
"time"
)
type Cache struct {
store Store
prefix string
}
// Create a new cache
func NewCache(prefix string, store Store) *Cache {
return &Cache{
store: store,
prefix: prefix,
}
}
func (cache *Cache) Get(ctx context.Context, key string, value any) error {
return cache.store.Get(ctx, cache.key(key), value)
}
func (cache *Cache) Set(ctx context.Context, key string, value any, ttl time.Duration) error {
return cache.store.Set(ctx, cache.key(key), value, ttl)
}
func (cache *Cache) key(key string) string {
return cache.prefix + "::" + key
}

61
internal/cache/memory_store.go vendored Normal file
View file

@ -0,0 +1,61 @@
package cache
import (
"context"
"fmt"
"reflect"
"time"
)
// Store time function in a variable.
// Makes it easy to travel in time when testing.
var now = time.Now
type memoryStoreItem struct {
// Actual value stored.
value any
// Cache expiration time.
expired time.Time
}
type MemoryStore struct {
data map[string]memoryStoreItem
}
func NewMemoryStore() *MemoryStore {
return &MemoryStore{make(map[string]memoryStoreItem)}
}
func (s *MemoryStore) Get(ctx context.Context, key string, value any) error {
if item, ok := s.data[key]; ok {
if item.expired.Before(now()) {
delete(s.data, key)
return fmt.Errorf("key: %s does not exist", key)
}
v := reflect.ValueOf(value)
if v.Kind() != reflect.Pointer {
return fmt.Errorf("value must be of pointer type, '%s' passed", v.Kind().String())
}
v.Elem().Set(reflect.ValueOf(item.value))
return nil
}
return fmt.Errorf("key: %s does not exist", key)
}
func (s *MemoryStore) Has(ctx context.Context, key string) bool {
_, hit := s.data[key]
return hit
}
func (s *MemoryStore) Set(ctx context.Context, key string, value any, ttl time.Duration) error {
s.data[key] = memoryStoreItem{
value: value,
expired: now().Add(ttl),
}
return nil
}

88
internal/cache/memory_store_test.go vendored Normal file
View file

@ -0,0 +1,88 @@
package cache
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
type memoryTestItem struct {
String string
Num uint32
Float float32
}
func TestMemoryStore_Set(t *testing.T) {
now = func() time.Time { return time.Unix(1581315270, 0) }
item := memoryTestItem{
String: "MyString",
Num: 23,
Float: 3.14,
}
expected := map[string]memoryStoreItem{
"key1": {
value: item,
expired: now().Add(time.Hour),
},
}
store := NewMemoryStore()
err := store.Set(context.Background(), "key1", item, time.Hour)
assert.NoError(t, err)
assert.Equal(t, expected, store.data)
}
func TestMemoryStore_GetMiss(t *testing.T) {
store := NewMemoryStore()
var v any
err := store.Get(context.Background(), "Key2", &v)
assert.Error(t, err)
}
func TestMemoryStore_GetHit(t *testing.T) {
expected := memoryTestItem{
String: "MyString",
Num: 23,
Float: 3.14,
}
store := NewMemoryStore()
err := store.Set(context.Background(), "key1", expected, time.Hour)
assert.NoError(t, err)
var actual memoryTestItem
err = store.Get(context.Background(), "key1", &actual)
assert.NoError(t, err)
assert.Equal(t, expected, actual)
}
func TestMemoryStore_GetNonPointer(t *testing.T) {
expected := memoryTestItem{
String: "MyString",
Num: 23,
Float: 3.14,
}
store := NewMemoryStore()
err := store.Set(context.Background(), "key1", expected, time.Hour)
assert.NoError(t, err)
var actual string
err = store.Get(context.Background(), "key1", actual)
assert.EqualError(t, err, "value must be of pointer type, 'string' passed")
}
func TestMemoryStore_Has(t *testing.T) {
store := NewMemoryStore()
err := store.Set(context.Background(), "key1", "value", time.Hour)
assert.NoError(t, err)
assert.True(t, store.Has(context.Background(), "key1"))
assert.False(t, store.Has(context.Background(), "key2"))
}

35
internal/cache/redis_store.go vendored Normal file
View file

@ -0,0 +1,35 @@
package cache
import (
"context"
"time"
"github.com/go-redis/cache/v9"
)
type RedisStore struct {
c *cache.Cache
}
func NewRedisStore(options *cache.Options) *RedisStore {
return &RedisStore{
c: cache.New(options),
}
}
func (s *RedisStore) Get(ctx context.Context, key string, value interface{}) error {
return s.c.Get(ctx, key, value)
}
func (s *RedisStore) Has(ctx context.Context, key string) bool {
return s.c.Exists(ctx, key)
}
func (s *RedisStore) Set(ctx context.Context, key string, value any, ttl time.Duration) error {
return s.c.Set(&cache.Item{
Ctx: ctx,
Key: key,
Value: value,
TTL: ttl,
})
}

104
internal/cache/redis_store_test.go vendored Normal file
View file

@ -0,0 +1,104 @@
package cache
import (
"context"
"testing"
"time"
"github.com/go-redis/redismock/v9"
redis_cache "github.com/go-redis/cache/v9"
"github.com/stretchr/testify/assert"
)
type testItem struct {
Num uint32
Name string
}
func TestRedisStore_Set(t *testing.T) {
client, mock := redismock.NewClientMock()
store := NewRedisStore(&redis_cache.Options{
Redis: client,
})
expected := testItem{
Num: 24,
Name: "Some Name",
}
bytes, err := store.c.Marshal(expected)
assert.NoError(t, err)
mock.ExpectSet("mykey", bytes, time.Minute).SetVal("OK")
err = store.Set(context.Background(), "mykey", expected, time.Minute)
assert.NoError(t, err)
assert.NoError(t, mock.ExpectationsWereMet())
}
func TestRedisStore_GetMiss(t *testing.T) {
client, mock := redismock.NewClientMock()
store := NewRedisStore(&redis_cache.Options{
Redis: client,
})
mock.ExpectGet("mykey").SetErr(redis_cache.ErrCacheMiss)
expected := testItem{}
err := store.Get(context.Background(), "mykey", &expected)
assert.ErrorIs(t, err, redis_cache.ErrCacheMiss)
assert.NoError(t, mock.ExpectationsWereMet())
}
func TestRedisStore_GetHit(t *testing.T) {
client, mock := redismock.NewClientMock()
store := NewRedisStore(&redis_cache.Options{
Redis: client,
})
expected := testItem{
Num: 42,
Name: "MyName",
}
bytes, err := store.c.Marshal(expected)
assert.NoError(t, err)
mock.ExpectSet("mykey2", bytes, time.Second*20).SetVal("OK")
mock.ExpectGet("mykey2").SetVal(string(bytes))
err = store.Set(context.Background(), "mykey2", expected, time.Second*20)
assert.NoError(t, err)
actual := testItem{}
err = store.Get(context.Background(), "mykey2", &actual)
assert.NoError(t, err)
assert.Equal(t, expected, actual)
assert.NoError(t, mock.ExpectationsWereMet())
}
func TestRedisStore_Has(t *testing.T) {
client, mock := redismock.NewClientMock()
store := NewRedisStore(&redis_cache.Options{
Redis: client,
})
bytes, err := store.c.Marshal("value")
assert.NoError(t, err)
mock.ExpectSet("key1", bytes, time.Minute*15).SetVal("OK")
mock.ExpectGet("key1").SetVal(string(bytes))
mock.ExpectGet("key2").RedisNil()
err = store.Set(context.Background(), "key1", "value", time.Minute*15)
assert.NoError(t, err)
assert.True(t, store.Has(context.Background(), "key1"))
assert.False(t, store.Has(context.Background(), "key2"))
assert.NoError(t, mock.ExpectationsWereMet())
}

19
internal/cache/store.go vendored Normal file
View file

@ -0,0 +1,19 @@
package cache
import (
"context"
"time"
)
// Interface to a cache storage.
type Store interface {
// Set an item in the store.
Set(ctx context.Context, key string, value any, TTL time.Duration) error
// Get an item from the store.
// returns an error if key is not found or there is other problems.
Get(ctx context.Context, key string, value any) error
// Check if a key exist in the store.
Has(ctx context.Context, key string) bool
}

18
internal/config/cli.go Normal file
View file

@ -0,0 +1,18 @@
package config
import (
"path"
"github.com/urfave/cli/v2"
)
// Read cli flag values into the config
func (cfg *Config) ReadCliFlags(ctx *cli.Context) error {
logFile := ctx.Path("log")
if len(logFile) > 0 {
cfg.Log.Directory = path.Dir(logFile)
cfg.Log.Filename = path.Base(logFile)
}
return nil
}

65
internal/config/config.go Normal file
View file

@ -0,0 +1,65 @@
package config
import (
"time"
"github.com/eosswedenorg/thalos/internal/log"
shipclient "github.com/eosswedenorg-go/antelope-ship-client"
)
type RedisConfig struct {
Addr string `yaml:"addr"`
User string `yaml:"user"`
Password string `yaml:"password"`
DB int `yaml:"db"`
Prefix string `yaml:"prefix"`
}
type TelegramConfig struct {
Id string `yaml:"id"`
Channel int64 `yaml:"channel"`
}
type ShipConfig struct {
Url string `yaml:"url"`
IrreversibleOnly bool `yaml:"irreversible_only"`
MaxMessagesInFlight uint32 `yaml:"max_messages_in_flight"`
StartBlockNum uint32 `yaml:"start_block_num"`
EndBlockNum uint32 `yaml:"end_block_num"`
Chain string `yaml:"chain"`
}
type Config struct {
Name string `yaml:"name"`
Ship ShipConfig `yaml:"ship"`
Api string `yaml:"api"`
Log log.Config `yaml:"log"`
Redis RedisConfig `yaml:"redis"`
MessageCodec string `yaml:"message_codec"`
Telegram TelegramConfig `yaml:"telegram"`
}
// Create a new Config object with default values
func New() Config {
return Config{
MessageCodec: "json",
Log: log.Config{
MaxFileSize: 10 * 1000 * 1000, // 10 mb
MaxTime: time.Hour * 24,
},
Ship: ShipConfig{
StartBlockNum: shipclient.NULL_BLOCK_NUMBER,
EndBlockNum: shipclient.NULL_BLOCK_NUMBER,
MaxMessagesInFlight: 10,
IrreversibleOnly: false,
},
Redis: RedisConfig{
Addr: "localhost:6379",
Prefix: "ship",
},
}
}

View file

@ -0,0 +1,148 @@
package config
import (
"testing"
"time"
"github.com/eosswedenorg/thalos/internal/log"
"github.com/stretchr/testify/require"
shipclient "github.com/eosswedenorg-go/antelope-ship-client"
)
func TestNew(t *testing.T) {
expected := Config{
MessageCodec: "json",
Log: log.Config{
MaxFileSize: 10 * 1000 * 1000, // 10 mb
MaxTime: time.Hour * 24,
},
Ship: ShipConfig{
StartBlockNum: shipclient.NULL_BLOCK_NUMBER,
EndBlockNum: shipclient.NULL_BLOCK_NUMBER,
MaxMessagesInFlight: 10,
IrreversibleOnly: false,
},
Redis: RedisConfig{
Addr: "localhost:6379",
Password: "",
DB: 0,
Prefix: "ship",
},
}
require.Equal(t, New(), expected)
}
func TestReadYAML(t *testing.T) {
expected := Config{
Name: "ship-reader-1",
Api: "http://127.0.0.1:8080",
MessageCodec: "mojibake",
Log: log.Config{
Filename: "some_file.log",
Directory: "/path/to/whatever",
MaxFileSize: 200,
MaxTime: 30 * time.Minute,
},
Ship: ShipConfig{
Url: "127.0.0.1:8089",
StartBlockNum: 23671836,
EndBlockNum: 23872222,
IrreversibleOnly: true,
MaxMessagesInFlight: 1337,
},
Telegram: TelegramConfig{
Id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw",
Channel: -123456789,
},
Redis: RedisConfig{
Addr: "localhost:6379",
User: "myuser",
Password: "passwd",
DB: 4,
Prefix: "some::ship",
},
}
cfg := Config{}
err := cfg.ReadYAML([]byte(`
name: "ship-reader-1"
api: "http://127.0.0.1:8080"
message_codec: "mojibake"
log:
filename: some_file.log
directory: /path/to/whatever
maxtime: 30m
maxfilesize: 200b
ship:
url: "127.0.0.1:8089"
irreversible_only: true
max_messages_in_flight: 1337
start_block_num: 23671836
end_block_num: 23872222
telegram:
id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw"
channel: -123456789
redis:
addr: "localhost:6379"
user: "myuser"
password: "passwd"
db: 4
prefix: "some::ship"
`))
require.NoError(t, err)
require.Equal(t, cfg, expected)
}
func TestReadYAMLShorthandShipUrl(t *testing.T) {
expected := Config{
Name: "ship-reader-1",
Api: "http://127.0.0.1:8080",
MessageCodec: "json",
Log: log.Config{
MaxFileSize: 10 * 1000 * 1000, // 10 mb
MaxTime: time.Hour * 24,
},
Ship: ShipConfig{
Url: "127.0.0.1:8089",
StartBlockNum: shipclient.NULL_BLOCK_NUMBER,
EndBlockNum: shipclient.NULL_BLOCK_NUMBER,
MaxMessagesInFlight: 10,
IrreversibleOnly: false,
},
Telegram: TelegramConfig{
Id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw",
Channel: -123456789,
},
Redis: RedisConfig{
Addr: "localhost:6379",
Password: "passwd",
DB: 4,
Prefix: "some::ship",
},
}
cfg := New()
err := cfg.ReadYAML([]byte(`
name: "ship-reader-1"
api: "http://127.0.0.1:8080"
ship: "127.0.0.1:8089"
telegram:
id: "110201543:AAHdqTcvCH1vGWJxfSeofSAs0K5PALDsaw"
channel: -123456789
redis:
addr: "localhost:6379"
password: "passwd"
db: 4
prefix: "some::ship"
`))
require.NoError(t, err)
require.Equal(t, cfg, expected)
}

15
internal/config/file.go Normal file
View file

@ -0,0 +1,15 @@
package config
import (
"os"
)
// Read values from file
func (cfg *Config) ReadFile(filename string) error {
bytes, err := os.ReadFile(filename)
if err != nil {
return err
}
return cfg.ReadYAML(bytes)
}

26
internal/config/yaml.go Normal file
View file

@ -0,0 +1,26 @@
package config
import (
"gopkg.in/yaml.v3"
)
func (ship *ShipConfig) UnmarshalYAML(value *yaml.Node) error {
var err error
if value.Kind == yaml.ScalarNode {
ship.Url = value.Value
} else {
type ShipConfigRaw ShipConfig
raw := ShipConfigRaw(*ship)
if err = value.Decode(&raw); err == nil {
*ship = ShipConfig(raw)
}
}
return err
}
// Read YAML config data
func (cfg *Config) ReadYAML(data []byte) error {
return yaml.Unmarshal(data, cfg)
}

View file

@ -0,0 +1,37 @@
package redis
import (
"context"
"github.com/eosswedenorg/thalos/api"
. "github.com/eosswedenorg/thalos/api/redis"
"github.com/redis/go-redis/v9"
)
type Publisher struct {
pipeline redis.Pipeliner
ctx context.Context
ns Namespace
}
func NewPublisher(ctx context.Context, client *redis.Client, ns Namespace) *Publisher {
return &Publisher{
pipeline: client.Pipeline(),
ctx: ctx,
ns: ns,
}
}
func (r *Publisher) Write(channel api.Channel, payload []byte) error {
return r.pipeline.Publish(r.ctx, r.ns.NewKey(channel).String(), payload).Err()
}
func (r *Publisher) Flush() error {
_, err := r.pipeline.Exec(r.ctx)
return err
}
func (r *Publisher) Close() error {
return r.Flush()
}

View file

@ -0,0 +1,28 @@
package redis
import (
"context"
"testing"
"github.com/eosswedenorg/thalos/api"
. "github.com/eosswedenorg/thalos/api/redis"
"github.com/go-redis/redismock/v9"
"github.com/stretchr/testify/assert"
)
func TestPublisher_Write(t *testing.T) {
client, mock := redismock.NewClientMock()
pub := NewPublisher(context.Background(), client, Namespace{ChainID: "id"})
mock.MatchExpectationsInOrder(true)
mock.ExpectPublish("ship::id::test", []byte("some string")).SetVal(0)
mock.ExpectPublish("ship::id::test2", []byte("some other string")).SetVal(0)
assert.NoError(t, pub.Write(api.Channel{"test"}, []byte("some string")))
assert.NoError(t, pub.Write(api.Channel{"test2"}, []byte("some other string")))
assert.NoError(t, pub.Flush())
assert.NoError(t, mock.ExpectationsWereMet())
}

21
internal/driver/writer.go Normal file
View file

@ -0,0 +1,21 @@
package driver
import "github.com/eosswedenorg/thalos/api"
// Writer interface defines the required methods
// to send messages over an channel.
//
// This is a low-level interface typically implemented by backend drivers
type Writer interface {
// Write writes a message over a channel.
// The message may or may not be buffered depending on the implementation.
Write(channel api.Channel, payload []byte) error
// Flush writes any buffered messages to the channel.
// If the implementation does not support buffering. this is a noop.
Flush() error
// Close closes the writer
// Any blocked Flush or Write operations will be unblocked.
Close() error
}

View file

@ -0,0 +1,48 @@
package log
import (
"io"
log "github.com/sirupsen/logrus"
)
type HookWriter struct {
Writer io.Writer
LogLevels []log.Level
}
func (hook *HookWriter) Fire(entry *log.Entry) error {
line, err := entry.String()
if err != nil {
return err
}
_, err = hook.Writer.Write([]byte(line))
return err
}
func (hook *HookWriter) Levels() []log.Level {
return hook.LogLevels
}
func MakeStdHook(writer io.Writer) *HookWriter {
return &HookWriter{
Writer: writer,
LogLevels: []log.Level{
log.InfoLevel,
log.DebugLevel,
},
}
}
func MakeErrorHook(writer io.Writer) *HookWriter {
return &HookWriter{
Writer: writer,
LogLevels: []log.Level{
log.ErrorLevel,
log.WarnLevel,
log.FatalLevel,
log.PanicLevel,
log.TraceLevel,
},
}
}

View file

@ -0,0 +1,127 @@
package log
import (
"fmt"
"io"
"os"
"path"
"time"
)
// Rotating file represents a file that can be rotated when either the file
// becomes to large or to old, whatever comes first
type RotatingFile struct {
fd *os.File
size int64
maxSize int64
ts time.Time
maxAge time.Duration
format string
}
func open(filename string) (*os.File, error) {
return os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0o666)
}
// Open a new rotating file.
func NewRotatingFile(filename string, maxSize int64, maxAge time.Duration) (*RotatingFile, error) {
if err := os.MkdirAll(path.Dir(filename), 0o766); err != nil && !os.IsExist(err) {
return nil, err
}
fd, err := open(filename)
if err != nil {
return nil, err
}
stat, err := fd.Stat()
if err != nil {
return nil, err
}
return &RotatingFile{
fd: fd,
size: stat.Size(),
maxSize: maxSize,
ts: time.Now(),
maxAge: maxAge,
format: "2006-01-02_150405",
}, nil
}
// Open a new rotating file using a config struct.
func NewRotatingFileFromConfig(config Config, suffix string) (*RotatingFile, error) {
if len(suffix) > 0 {
suffix = "_" + suffix
}
return NewRotatingFile(config.GetFilePath()+suffix+".log", int64(config.MaxFileSize), config.MaxTime)
}
func (w *RotatingFile) newFilename(name string) string {
ext := path.Ext(name)
if len(ext) > 0 {
name = name[:len(name)-len(ext)]
}
return fmt.Sprintf("%s-%s%s", name, time.Now().Format(w.format), ext)
}
// Get the filename
func (w RotatingFile) GetFilename() string {
return path.Base(w.fd.Name())
}
// Rotate the file.
func (w *RotatingFile) Rotate() error {
dst, err := os.OpenFile(w.newFilename(w.fd.Name()), os.O_CREATE|os.O_WRONLY, 0o666)
if err != nil {
return err
}
defer dst.Close()
// Seek to the beginning of file
if _, err = w.fd.Seek(0, io.SeekStart); err != nil {
return err
}
// And copy the contents to the new file.
if _, err = io.Copy(dst, w.fd); err != nil {
return err
}
// Then truncate the log.
if err = w.fd.Truncate(0); err != nil {
return err
}
w.size = 0
w.ts = time.Now()
return nil
}
// Implement io.Writer interface
func (w *RotatingFile) Write(p []byte) (int, error) {
n, err := w.fd.Write(p)
if err != nil {
return n, err
}
w.size += int64(n)
// Check if we should rotate
if w.size >= w.maxSize || time.Since(w.ts) >= w.maxAge {
if err := w.Rotate(); err != nil {
return n, err
}
}
return n, nil
}
// Implement io.Closer interface
func (w *RotatingFile) Close() error {
err := w.fd.Close()
w.fd = nil
return err
}

35
internal/log/config.go Normal file
View file

@ -0,0 +1,35 @@
package log
import (
"path"
"time"
"github.com/eosswedenorg/thalos/internal/types"
)
// Config represents configuration parameters for a log.
type Config struct {
// Filename where the log is stored.
Filename string `yaml:"filename"`
// Directory where the log files are stored.
Directory string `yaml:"directory"`
// Maximum filesize, the log is rotated when this size is exceeded.
MaxFileSize types.Size `yaml:"maxfilesize"`
// Maximum lifetime of the file before it is rotated.
MaxTime time.Duration `yaml:"maxtime"`
}
func (c Config) GetFilename() string {
return path.Base(c.Filename)
}
func (c Config) GetDirectory() string {
return path.Clean(c.Directory)
}
func (c Config) GetFilePath() string {
return path.Join(c.GetDirectory(), c.GetFilename())
}

View file

@ -0,0 +1,84 @@
package log
import (
"testing"
)
func TestConfig_GetDirectory(t *testing.T) {
tests := []struct {
name string
directory string
want string
}{
{"empty", "", "."},
{"root", "/", "/"},
{"one", "dir", "dir"},
{"path", "/path/to/some/directory", "/path/to/some/directory"},
{"relative", "relative/directory", "relative/directory"},
{"backtrace", "/path/./to/some/../directory", "/path/to/directory"},
{"multislash", "//path/to///directory//", "/path/to/directory"},
{"everything", "path/to/..//./from/directory//", "path/from/directory"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := Config{
Directory: tt.directory,
}
if got := c.GetDirectory(); got != tt.want {
t.Errorf("Config.GetDirectory() = %v, want %v", got, tt.want)
}
})
}
}
func TestConfig_GetFilename(t *testing.T) {
tests := []struct {
name string
filename string
want string
}{
{"empty", "", "."},
{"name", "some_file.txt", "some_file.txt"},
{"path", "/path/to/my.log", "my.log"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := Config{
Filename: tt.filename,
}
if got := c.GetFilename(); got != tt.want {
t.Errorf("Config.GetFilename() = %v, want %v", got, tt.want)
}
})
}
}
func TestConfig_GetFilePath(t *testing.T) {
tests := []struct {
name string
filename string
directory string
want string
}{
{"empty", "", "", "."},
{"directory", "", "dir", "dir"},
{"filename", "filename", "", "filename"},
{"both", "filename", "dir", "dir/filename"},
{"root", "filename", "/", "/filename"},
{"abs", "filename", "/path/to/logs", "/path/to/logs/filename"},
{"relative", "filename", "/srv/../log", "/log/filename"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := Config{
Filename: tt.filename,
Directory: tt.directory,
}
if got := c.GetFilePath(); got != tt.want {
t.Errorf("Config.GetFilePath() = %v, want %v", got, tt.want)
}
})
}
}

15
internal/log/init.go Normal file
View file

@ -0,0 +1,15 @@
package log
import (
log "github.com/sirupsen/logrus"
)
func init() {
// Initialize logger
formatter := log.TextFormatter{
FullTimestamp: true,
TimestampFormat: "2006-01-02 15:04:05.0000",
}
log.SetFormatter(&formatter)
}

View file

@ -0,0 +1,362 @@
package server
import (
"encoding/hex"
"encoding/json"
"github.com/eosswedenorg/thalos/api"
"github.com/eosswedenorg/thalos/api/message"
"github.com/eosswedenorg/thalos/internal/abi"
"github.com/eosswedenorg/thalos/internal/driver"
log "github.com/sirupsen/logrus"
"github.com/eoscanada/eos-go"
"github.com/eoscanada/eos-go/ship"
shipclient "github.com/eosswedenorg-go/antelope-ship-client"
)
// logDecoratedEncoder decorates a message.Encoder and logs any error.
func logDecoratedEncoder(encoder message.Encoder) message.Encoder {
return func(v interface{}) ([]byte, error) {
payload, err := encoder(v)
if err != nil {
log.WithError(err).
WithField("v", v).
Warn("Failed to encode message")
}
return payload, err
}
}
// A ShipProcessor will consume messages from a ship stream, convert the messages into
// thalos specfic ones, encode them and finally post them to an api.Writer
type ShipProcessor struct {
// The ship stream to process.
shipStream *shipclient.Stream
// Abi manager used for cacheing
abi *abi.AbiManager
// Writer to send messages to.
writer driver.Writer
// Encoder used to encode messages
encode message.Encoder
// Function for saving state.
saver StateSaver
// Internal state
state State
// System contract ("eosio" per default)
syscontract eos.AccountName
// ABI Returned from SHIP
shipABI *eos.ABI
}
// SpawnProcessor creates a new ShipProccessor that consumes the shipclient.Stream passed to it.
func SpawnProccessor(shipStream *shipclient.Stream, loader StateLoader, saver StateSaver, writer driver.Writer, abi *abi.AbiManager, codec message.Codec) *ShipProcessor {
processor := &ShipProcessor{
saver: saver,
abi: abi,
writer: writer,
shipStream: shipStream,
encode: logDecoratedEncoder(codec.Encoder),
syscontract: eos.AccountName("eosio"),
}
loader(&processor.state)
// Attach handlers
shipStream.BlockHandler = processor.processBlock
shipStream.InitHandler = processor.initHandler
// Needed because if nil, traces/table deltas will not be included in the response from ship.
shipStream.TraceHandler = func([]*ship.TransactionTraceV0) {}
shipStream.TableDeltaHandler = func([]*ship.TableDeltaV0) {}
return processor
}
func (processor *ShipProcessor) initHandler(abi *eos.ABI) {
processor.shipABI = abi
}
func (processor *ShipProcessor) queueMessage(channel api.Channel, payload []byte) bool {
err := processor.writer.Write(channel, payload)
if err != nil {
log.WithError(err).Errorf("Failed to post to channel '%s'", channel)
return false
}
return true
}
func (processor *ShipProcessor) encodeQueue(channel api.Channel, v interface{}) bool {
if payload, err := processor.encode(v); err == nil {
return processor.queueMessage(channel, payload)
}
return false
}
func decode(abi *eos.ABI, act *ship.Action, v any) error {
jsondata, err := abi.DecodeAction(act.Data, act.Name)
if err != nil {
return err
}
return json.Unmarshal(jsondata, v)
}
// updateAbiFromAction updates the contract abi based on the ship.Action passed.
func (processor *ShipProcessor) updateAbiFromAction(act *ship.Action) error {
ABI, err := processor.abi.GetAbi(processor.syscontract)
if err != nil {
return err
}
set_abi := struct {
Abi string
Account eos.AccountName
}{}
if err := decode(ABI, act, &set_abi); err != nil {
return err
}
binary_abi, err := hex.DecodeString(set_abi.Abi)
if err != nil {
return err
}
contract_abi := eos.ABI{}
if err = eos.UnmarshalBinary(binary_abi, &contract_abi); err != nil {
return err
}
return processor.abi.SetAbi(set_abi.Account, &contract_abi)
}
// Get the current block.
func (processor *ShipProcessor) GetCurrentBlock() uint32 {
return processor.state.CurrentBlock
}
// Callback function called by shipclient.Stream when a new block arrives.
func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) {
// Check to see if we have a microfork and post a message to
// the rollback channel in that case.
if processor.state.CurrentBlock > 0 && block.ThisBlock.BlockNum < processor.state.CurrentBlock {
log.WithField("old_block", processor.state.CurrentBlock).
WithField("new_block", block.ThisBlock.BlockNum).
Warn("Fork detected, old_block is greater than new_block")
processor.encodeQueue(api.RollbackChannel, message.RollbackMessage{
OldBlockNum: processor.state.CurrentBlock,
NewBlockNum: block.ThisBlock.BlockNum,
})
}
processor.state.CurrentBlock = block.ThisBlock.BlockNum
if block.ThisBlock.BlockNum%100 == 0 {
log.Infof("Current: %d, Head: %d", processor.state.CurrentBlock, block.Head.BlockNum)
}
if block.ThisBlock.BlockNum%10 == 0 {
hb := message.HeartBeat{
BlockNum: block.ThisBlock.BlockNum,
LastIrreversibleBlockNum: block.LastIrreversible.BlockNum,
HeadBlockNum: block.Head.BlockNum,
}
processor.encodeQueue(api.HeartbeatChannel, hb)
}
mainLogger := log.WithField("block", block.ThisBlock.BlockNum).Dup()
// Process traces
if block.Traces != nil && len(block.Traces.Elem) > 0 {
for _, trace := range block.Traces.AsTransactionTracesV0() {
logger := mainLogger.WithField("type", "trace").WithField("tx_id", trace.ID.String()).Dup()
transaction := message.TransactionTrace{
ID: trace.ID.String(),
BlockNum: block.Block.BlockNumber(),
Timestamp: block.Block.Timestamp.Time.UTC(),
Status: trace.Status.String(),
CPUUsageUS: trace.CPUUsageUS,
NetUsage: trace.NetUsage,
NetUsageWords: uint32(trace.NetUsageWords),
Elapsed: int64(trace.Elapsed),
Scheduled: trace.Scheduled,
Except: trace.Except,
Error: trace.ErrorCode,
}
// Actions
for _, actionTraceVar := range trace.ActionTraces {
var act_trace *ship.ActionTraceV1
if trace_v0, ok := actionTraceVar.Impl.(*ship.ActionTraceV0); ok {
// convert to v1
act_trace = &ship.ActionTraceV1{
ActionOrdinal: trace_v0.ActionOrdinal,
CreatorActionOrdinal: trace_v0.CreatorActionOrdinal,
Receipt: trace_v0.Receipt,
Receiver: trace_v0.Receiver,
Act: trace_v0.Act,
ContextFree: trace_v0.ContextFree,
Elapsed: trace_v0.Elapsed,
Console: trace_v0.Console,
AccountRamDeltas: trace_v0.AccountRamDeltas,
Except: trace_v0.Except,
ErrorCode: trace_v0.ErrorCode,
ReturnValue: []byte{},
}
} else {
act_trace = actionTraceVar.Impl.(*ship.ActionTraceV1)
}
// Check if actions updates an abi.
if act_trace.Act.Account == processor.syscontract && act_trace.Act.Name == eos.ActionName("setabi") {
err := processor.updateAbiFromAction(act_trace.Act)
if err != nil {
logger.WithError(err).Warn("Failed to update abi")
}
}
act := message.ActionTrace{
TxID: trace.ID.String(),
BlockNum: block.Block.BlockNumber(),
Timestamp: block.Block.Timestamp.Time.UTC(),
Name: act_trace.Act.Name.String(),
Contract: act_trace.Act.Account.String(),
Receiver: act_trace.Receiver.String(),
}
if act_trace.Receipt != nil {
receipt := act_trace.Receipt.Impl.(*ship.ActionReceiptV0)
act.Receipt = &message.ActionReceipt{
Receiver: receipt.Receiver.String(),
ActDigest: receipt.ActDigest.String(),
GlobalSequence: receipt.GlobalSequence,
RecvSequence: receipt.RecvSequence,
CodeSequence: uint32(receipt.CodeSequence),
ABISequence: uint32(receipt.ABISequence),
}
for _, auth := range receipt.AuthSequence {
act.Receipt.AuthSequence = append(act.Receipt.AuthSequence, message.AccountAuthSequence{
Account: auth.Account.String(),
Sequence: auth.Sequence,
})
}
}
for _, auth := range act_trace.Act.Authorization {
act.Authorization = append(act.Authorization, message.PermissionLevel{
Actor: auth.Actor.String(),
Permission: auth.Permission.String(),
})
}
ABI, err := processor.abi.GetAbi(act_trace.Act.Account)
if err == nil {
if err = decode(ABI, act_trace.Act, &act.Data); err != nil {
logger.WithFields(log.Fields{
"contract": act_trace.Act.Account,
"action": act_trace.Act.Name,
}).WithError(err).Warn("Failed to decode action")
}
} else {
logger.WithField("contract", act_trace.Act.Account).
WithError(err).Error("Failed to get abi for contract")
}
payload, err := processor.encode(act)
if err != nil {
continue
}
transaction.ActionTraces = append(transaction.ActionTraces, act)
channels := []api.Channel{
api.ActionChannel{}.Channel(),
api.ActionChannel{Name: act.Name}.Channel(),
api.ActionChannel{Contract: act.Contract}.Channel(),
api.ActionChannel{Name: act.Name, Contract: act.Contract}.Channel(),
}
for _, channel := range channels {
processor.queueMessage(channel, payload)
}
}
processor.encodeQueue(api.TransactionChannel, transaction)
}
}
// Process deltas
for _, delta := range block.Deltas.AsTableDeltasV0() {
logger := mainLogger.WithField("type", "table_delta").WithField("table", delta.Name).Dup()
rows := []message.TableDeltaRow{}
for _, row := range delta.Rows {
msg := message.TableDeltaRow{
Present: row.Present,
RawData: row.Data,
}
if processor.shipABI != nil {
v, err := processor.shipABI.DecodeTableRowTyped(delta.Name, row.Data)
if err == nil {
err = json.Unmarshal(v, &msg.Data)
if err != nil {
logger.WithError(err).Error("Failed to decode json")
}
} else {
logger.Error("Failed to decode table delta")
}
} else {
logger.Warn("No SHIP ABI present")
}
rows = append(rows, msg)
}
message := message.TableDelta{
BlockNum: block.Block.BlockNumber(),
Timestamp: block.Block.Timestamp.Time.UTC(),
Name: delta.Name,
Rows: rows,
}
channels := []api.Channel{
api.TableDeltaChannel{}.Channel(),
api.TableDeltaChannel{Name: delta.Name}.Channel(),
}
for _, channel := range channels {
processor.encodeQueue(channel, message)
}
}
err := processor.writer.Flush()
if err != nil {
log.WithError(err).Error("Failed to send messages")
}
err = processor.saver(processor.state)
if err != nil {
log.WithError(err).Error("Failed to save state")
}
}
// Close closes the writer associated with the processor.
func (processor *ShipProcessor) Close() error {
return processor.writer.Close()
}

15
internal/server/state.go Normal file
View file

@ -0,0 +1,15 @@
package server
// State represents thalos runtime state
type State struct {
// Last processed block
CurrentBlock uint32
}
type (
// StateLoader is a function that loads a state.
StateLoader func(*State)
// StateSaver is a function that saves a state.
StateSaver func(State) error
)

36
internal/types/size.go Normal file
View file

@ -0,0 +1,36 @@
package types
import (
"github.com/docker/go-units"
"gopkg.in/yaml.v3"
)
// Size is an alias of int64 that can handle sizes represented
// in human readable strings like "200mb", "20 GB" etc.
// The value is in bytes.
type Size int64
// Parse a string into number of bytes stored in a int64
func (s *Size) Parse(value string) error {
// Empty strings are not an error, they represents zero bytes.
if len(value) < 1 {
*s = 0
return nil
}
v, err := units.FromHumanSize(value)
if err != nil {
return err
}
*s = Size(v)
return nil
}
func (s Size) String() string {
return units.HumanSize(float64(s))
}
func (s *Size) UnmarshalYAML(value *yaml.Node) error {
return s.Parse(value.Value)
}

View file

@ -0,0 +1,34 @@
package types
import "testing"
func TestSize_Parse(t *testing.T) {
tests := []struct {
name string
value string
expected int64
wantErr bool
}{
{"Empty", "", 0, false},
{"NoDigit", "abcdefg", 0, true},
{"Negative", "-10MB", 0, true},
{"Invalid prefix", "100WAX", 0, true},
{"Multiple spaces between prefix and value", "100 gb", 0, true},
{"100kb", "100kb", 100 * 1000, false},
{"10MB", "10 MB", 10 * 1000 * 1000, false},
{"2gb", "2gb", 2 * 1000 * 1000 * 1000, false},
{"4Tb", "4 Tb", 4 * 1000 * 1000 * 1000 * 1000, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := Size(0)
if err := s.Parse(tt.value); (err != nil) != tt.wantErr {
t.Errorf("Size.Parse() error = %v, wantErr %v", err, tt.wantErr)
}
if int64(s) != tt.expected {
t.Errorf("Size = %v, expected %v", s, tt.expected)
}
})
}
}