mirror of
https://github.com/eosswedenorg/thalos
synced 2026-07-04 12:03:41 +02:00
Improved code documentation.
This commit is contained in:
parent
96764ef651
commit
31c7ba6a4b
6 changed files with 45 additions and 9 deletions
|
|
@ -8,12 +8,14 @@ import (
|
||||||
redis_cache "github.com/go-redis/cache/v9"
|
redis_cache "github.com/go-redis/cache/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Cache represents a abi cache in redis.
|
||||||
type Cache struct {
|
type Cache struct {
|
||||||
c *redis_cache.Cache
|
c *redis_cache.Cache
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
prefix string
|
prefix string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create a new cache
|
||||||
func NewCache(prefix string, options *redis_cache.Options) *Cache {
|
func NewCache(prefix string, options *redis_cache.Options) *Cache {
|
||||||
return &Cache{
|
return &Cache{
|
||||||
c: redis_cache.New(options),
|
c: redis_cache.New(options),
|
||||||
|
|
@ -22,12 +24,14 @@ func NewCache(prefix string, options *redis_cache.Options) *Cache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get an ABI from the cache using the contract account name as the key.
|
||||||
func (cache *Cache) Get(account string) (*eos.ABI, error) {
|
func (cache *Cache) Get(account string) (*eos.ABI, error) {
|
||||||
var v eos.ABI
|
var v eos.ABI
|
||||||
err := cache.c.Get(cache.ctx, cache.key(account), &v)
|
err := cache.c.Get(cache.ctx, cache.key(account), &v)
|
||||||
return &v, err
|
return &v, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set an ABI in the cache.
|
||||||
func (cache *Cache) Set(account string, abi *eos.ABI, ttl time.Duration) error {
|
func (cache *Cache) Set(account string, abi *eos.ABI, ttl time.Duration) error {
|
||||||
return cache.c.Set(&redis_cache.Item{
|
return cache.c.Set(&redis_cache.Item{
|
||||||
Ctx: cache.ctx,
|
Ctx: cache.ctx,
|
||||||
|
|
|
||||||
|
|
@ -10,12 +10,14 @@ import (
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// AbiManager handles an ABI cache that fetches the ABI from an API on cache miss.
|
||||||
type AbiManager struct {
|
type AbiManager struct {
|
||||||
cache *Cache
|
cache *Cache
|
||||||
api *eos.API
|
api *eos.API
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create a new ABI Manager
|
||||||
func NewAbiManager(rdb *redis.Client, api *eos.API, id string) *AbiManager {
|
func NewAbiManager(rdb *redis.Client, api *eos.API, id string) *AbiManager {
|
||||||
// Init abi cache
|
// Init abi cache
|
||||||
cache := NewCache("thalos::cache::"+id+"::abi", &redis_cache.Options{
|
cache := NewCache("thalos::cache::"+id+"::abi", &redis_cache.Options{
|
||||||
|
|
@ -36,6 +38,8 @@ func (mgr *AbiManager) SetAbi(account eos.AccountName, abi *eos.ABI) error {
|
||||||
return mgr.cache.Set(string(account), abi, time.Hour)
|
return mgr.cache.Set(string(account), abi, time.Hour)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get an ABI from the cache, on cache miss it is fetched from the
|
||||||
|
// API, gets cached and then returned to the user
|
||||||
func (mgr *AbiManager) GetAbi(account eos.AccountName) (*eos.ABI, error) {
|
func (mgr *AbiManager) GetAbi(account eos.AccountName) (*eos.ABI, error) {
|
||||||
key := string(account)
|
key := string(account)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,8 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Rotating file represents a file that can be rotated when either the file
|
||||||
|
// becomes to large or to old, whatever comes first
|
||||||
type RotatingFile struct {
|
type RotatingFile struct {
|
||||||
fd *os.File
|
fd *os.File
|
||||||
size int64
|
size int64
|
||||||
|
|
@ -21,6 +23,7 @@ func open(filename string) (*os.File, error) {
|
||||||
return os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0o666)
|
return os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0o666)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Open a new rotating file.
|
||||||
func NewRotatingFile(filename string, maxSize int64, maxAge time.Duration) (*RotatingFile, error) {
|
func NewRotatingFile(filename string, maxSize int64, maxAge time.Duration) (*RotatingFile, error) {
|
||||||
if err := os.MkdirAll(path.Dir(filename), 0o766); err != nil && !os.IsExist(err) {
|
if err := os.MkdirAll(path.Dir(filename), 0o766); err != nil && !os.IsExist(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -46,6 +49,7 @@ func NewRotatingFile(filename string, maxSize int64, maxAge time.Duration) (*Rot
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Open a new rotating file using a config struct.
|
||||||
func NewRotatingFileFromConfig(config Config, suffix string) (*RotatingFile, error) {
|
func NewRotatingFileFromConfig(config Config, suffix string) (*RotatingFile, error) {
|
||||||
if len(suffix) > 0 {
|
if len(suffix) > 0 {
|
||||||
suffix = "_" + suffix
|
suffix = "_" + suffix
|
||||||
|
|
@ -62,6 +66,7 @@ func (w *RotatingFile) newFilename(name string) string {
|
||||||
return fmt.Sprintf("%s-%s%s", name, time.Now().Format(w.format), ext)
|
return fmt.Sprintf("%s-%s%s", name, time.Now().Format(w.format), ext)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get the filename
|
||||||
func (w RotatingFile) GetFilename() string {
|
func (w RotatingFile) GetFilename() string {
|
||||||
return path.Base(w.fd.Name())
|
return path.Base(w.fd.Name())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,11 +7,19 @@ import (
|
||||||
"github.com/eosswedenorg/thalos/app/types"
|
"github.com/eosswedenorg/thalos/app/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Config represents configuration parameters for a log.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Filename string `yaml:"filename"`
|
// Filename where the log is stored.
|
||||||
Directory string `yaml:"directory"`
|
Filename string `yaml:"filename"`
|
||||||
MaxFileSize types.Size `yaml:"maxfilesize"`
|
|
||||||
MaxTime time.Duration `yaml:"maxtime"`
|
// Directory where the log files are stored.
|
||||||
|
Directory string `yaml:"directory"`
|
||||||
|
|
||||||
|
// Maximum filesize, the log is rotated when this size is exceeded.
|
||||||
|
MaxFileSize types.Size `yaml:"maxfilesize"`
|
||||||
|
|
||||||
|
// Maximum lifetime of the file before it is rotated.
|
||||||
|
MaxTime time.Duration `yaml:"maxtime"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Config) GetFilename() string {
|
func (c Config) GetFilename() string {
|
||||||
|
|
|
||||||
|
|
@ -28,11 +28,20 @@ func logDecoratedEncoder(encoder message.Encoder) message.Encoder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A ShipProcessor will consume messages from a ship stream, convert the messages into
|
||||||
|
// thalos specfic ones, encode them and finally post them to an api.Writer
|
||||||
type ShipProcessor struct {
|
type ShipProcessor struct {
|
||||||
abi *abi.AbiManager
|
// The ship stream to process.
|
||||||
writer api.Writer
|
|
||||||
shipStream *shipclient.Stream
|
shipStream *shipclient.Stream
|
||||||
encode message.Encoder
|
|
||||||
|
// Abi manager used for cacheing
|
||||||
|
abi *abi.AbiManager
|
||||||
|
|
||||||
|
// Writer to send messages to.
|
||||||
|
writer api.Writer
|
||||||
|
|
||||||
|
// Encoder used to encode messages
|
||||||
|
encode message.Encoder
|
||||||
|
|
||||||
// Keep track of the current block we have processed.
|
// Keep track of the current block we have processed.
|
||||||
current_block uint32
|
current_block uint32
|
||||||
|
|
@ -41,6 +50,7 @@ type ShipProcessor struct {
|
||||||
syscontract eos.AccountName
|
syscontract eos.AccountName
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SpawnProcessor creates a new ShipProccessor that consumes the shipclient.Stream passed to it.
|
||||||
func SpawnProccessor(shipStream *shipclient.Stream, writer api.Writer, abi *abi.AbiManager, codec message.Codec) *ShipProcessor {
|
func SpawnProccessor(shipStream *shipclient.Stream, writer api.Writer, abi *abi.AbiManager, codec message.Codec) *ShipProcessor {
|
||||||
processor := &ShipProcessor{
|
processor := &ShipProcessor{
|
||||||
abi: abi,
|
abi: abi,
|
||||||
|
|
@ -84,6 +94,7 @@ func decode(abi *eos.ABI, act *ship.Action, v any) error {
|
||||||
return json.Unmarshal(jsondata, v)
|
return json.Unmarshal(jsondata, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// updateAbiFromAction updates the contract abi based on the ship.Action passed.
|
||||||
func (processor *ShipProcessor) updateAbiFromAction(act *ship.Action) error {
|
func (processor *ShipProcessor) updateAbiFromAction(act *ship.Action) error {
|
||||||
ABI, err := processor.abi.GetAbi(processor.syscontract)
|
ABI, err := processor.abi.GetAbi(processor.syscontract)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -111,10 +122,12 @@ func (processor *ShipProcessor) updateAbiFromAction(act *ship.Action) error {
|
||||||
return processor.abi.SetAbi(set_abi.Account, &contract_abi)
|
return processor.abi.SetAbi(set_abi.Account, &contract_abi)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get the current block.
|
||||||
func (processor *ShipProcessor) GetCurrentBlock() uint32 {
|
func (processor *ShipProcessor) GetCurrentBlock() uint32 {
|
||||||
return processor.current_block
|
return processor.current_block
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Callback function called by shipclient.Stream when a new block arrives.
|
||||||
func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) {
|
func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) {
|
||||||
processor.current_block = block.ThisBlock.BlockNum
|
processor.current_block = block.ThisBlock.BlockNum
|
||||||
|
|
||||||
|
|
@ -261,6 +274,7 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close closes the writer assciated with the processor.
|
||||||
func (processor *ShipProcessor) Close() error {
|
func (processor *ShipProcessor) Close() error {
|
||||||
return processor.writer.Close()
|
return processor.writer.Close()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,9 +6,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Size is an alias of int64 that can handle sizes represented
|
// Size is an alias of int64 that can handle sizes represented
|
||||||
// in human readable strings like "200mb", "20 GB" etc
|
// in human readable strings like "200mb", "20 GB" etc.
|
||||||
|
|
||||||
type Size int64 // Size in bytes.
|
// The value is in bytes.
|
||||||
|
type Size int64
|
||||||
|
|
||||||
// Parse a string into number of bytes stored in a int64
|
// Parse a string into number of bytes stored in a int64
|
||||||
func (s *Size) Parse(value string) error {
|
func (s *Size) Parse(value string) error {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue