mirror of
https://github.com/eosswedenorg/thalos
synced 2026-07-04 12:03:41 +02:00
Minor fixes.
This commit is contained in:
parent
4c843f16bf
commit
aeac190c94
6 changed files with 17 additions and 18 deletions
|
|
@ -109,19 +109,19 @@ func (c *Client) hbHandler(payload []byte) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) sub(channel Channel) error {
|
func (c *Client) sub(channel Channel) error {
|
||||||
var handler handler
|
var h handler
|
||||||
|
|
||||||
switch channel.Type() {
|
switch channel.Type() {
|
||||||
case RollbackChannel.Type():
|
case RollbackChannel.Type():
|
||||||
handler = c.rollbackHandler
|
h = c.rollbackHandler
|
||||||
case TransactionChannel.Type():
|
case TransactionChannel.Type():
|
||||||
handler = c.transactionHandler
|
h = c.transactionHandler
|
||||||
case HeartbeatChannel.Type():
|
case HeartbeatChannel.Type():
|
||||||
handler = c.hbHandler
|
h = c.hbHandler
|
||||||
case ActionChannel{}.Channel().Type():
|
case ActionChannel{}.Channel().Type():
|
||||||
handler = c.actHandler
|
h = c.actHandler
|
||||||
case TableDeltaChannel{}.Channel().Type():
|
case TableDeltaChannel{}.Channel().Type():
|
||||||
handler = c.tableDeltaHandler
|
h = c.tableDeltaHandler
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("invalid channel type. %s", channel.Type())
|
return fmt.Errorf("invalid channel type. %s", channel.Type())
|
||||||
}
|
}
|
||||||
|
|
@ -130,7 +130,7 @@ func (c *Client) sub(channel Channel) error {
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer c.wg.Done()
|
defer c.wg.Done()
|
||||||
c.worker(channel, handler)
|
c.worker(channel, h)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,6 @@ import (
|
||||||
api_redis "github.com/eosswedenorg/thalos/api/redis"
|
api_redis "github.com/eosswedenorg/thalos/api/redis"
|
||||||
"github.com/eosswedenorg/thalos/internal/abi"
|
"github.com/eosswedenorg/thalos/internal/abi"
|
||||||
"github.com/eosswedenorg/thalos/internal/cache"
|
"github.com/eosswedenorg/thalos/internal/cache"
|
||||||
. "github.com/eosswedenorg/thalos/internal/cache"
|
|
||||||
"github.com/eosswedenorg/thalos/internal/config"
|
"github.com/eosswedenorg/thalos/internal/config"
|
||||||
driver "github.com/eosswedenorg/thalos/internal/driver/redis"
|
driver "github.com/eosswedenorg/thalos/internal/driver/redis"
|
||||||
. "github.com/eosswedenorg/thalos/internal/log"
|
. "github.com/eosswedenorg/thalos/internal/log"
|
||||||
|
|
@ -157,7 +156,7 @@ func LogLevels() []string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func initAbiManager(api *eos.API, store cache.Store, chain_id string) *abi.AbiManager {
|
func initAbiManager(api *eos.API, store cache.Store, chain_id string) *abi.AbiManager {
|
||||||
cache := NewCache("thalos::cache::abi::"+chain_id, store)
|
cache := cache.NewCache("thalos::cache::abi::"+chain_id, store)
|
||||||
return abi.NewAbiManager(cache, api)
|
return abi.NewAbiManager(cache, api)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -373,14 +372,14 @@ func serverCmd(cmd *cobra.Command, args []string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setup cache storage
|
// Setup cache storage
|
||||||
cacheStore := NewRedisStore(&redis_cache.Options{
|
cacheStore := cache.NewRedisStore(&redis_cache.Options{
|
||||||
Redis: rdb,
|
Redis: rdb,
|
||||||
// Cache 10k keys for 10 minutes.
|
// Cache 10k keys for 10 minutes.
|
||||||
LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute),
|
LocalCache: redis_cache.NewTinyLFU(10000, 10*time.Minute),
|
||||||
})
|
})
|
||||||
|
|
||||||
// Setup general cache
|
// Setup general cache
|
||||||
cache := NewCache("thalos::cache::instance::"+conf.Name, cacheStore)
|
cache := cache.NewCache("thalos::cache::instance::"+conf.Name, cacheStore)
|
||||||
|
|
||||||
eosClient := eos.New(conf.Api)
|
eosClient := eos.New(conf.Api)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ var benchCmd = &cobra.Command{
|
||||||
Use: "bench",
|
Use: "bench",
|
||||||
Short: "Run a benchmark against a thalos node",
|
Short: "Run a benchmark against a thalos node",
|
||||||
Run: func(cmd *cobra.Command, args []string) {
|
Run: func(cmd *cobra.Command, args []string) {
|
||||||
var counter int = 0
|
counter := 0
|
||||||
interval, _ := cmd.Flags().GetDuration("interval")
|
interval, _ := cmd.Flags().GetDuration("interval")
|
||||||
|
|
||||||
url, _ := cmd.Flags().GetString("redis-url")
|
url, _ := cmd.Flags().GetString("redis-url")
|
||||||
|
|
|
||||||
|
|
@ -96,7 +96,7 @@ var RedisACLCmd = &cobra.Command{
|
||||||
Short: "create a users.acl file",
|
Short: "create a users.acl file",
|
||||||
Run: func(cmd *cobra.Command, args []string) {
|
Run: func(cmd *cobra.Command, args []string) {
|
||||||
var err error
|
var err error
|
||||||
var out *os.File = os.Stdout
|
out := os.Stdout
|
||||||
|
|
||||||
rnd = rand.New(rand.NewSource(time.Now().UnixNano()))
|
rnd = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -81,7 +81,7 @@ var validateCmd = &cobra.Command{
|
||||||
log.WithError(msg).Error("Error when reading stream")
|
log.WithError(msg).Error("Error when reading stream")
|
||||||
case message.ActionTrace:
|
case message.ActionTrace:
|
||||||
if block_num > 0 {
|
if block_num > 0 {
|
||||||
var diff int32 = int32(msg.BlockNum - block_num)
|
diff := int32(msg.BlockNum - block_num)
|
||||||
if diff < 0 || diff > 1 {
|
if diff < 0 || diff > 1 {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"current_block": block_num,
|
"current_block": block_num,
|
||||||
|
|
|
||||||
|
|
@ -120,7 +120,7 @@ func (processor *ShipProcessor) updateAbiFromAction(act *ship.Action) error {
|
||||||
Abi string
|
Abi string
|
||||||
Account eos.AccountName
|
Account eos.AccountName
|
||||||
}{}
|
}{}
|
||||||
if err := decode(ABI, act, &set_abi); err != nil {
|
if err = decode(ABI, act, &set_abi); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -184,7 +184,7 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) {
|
||||||
transaction := message.TransactionTrace{
|
transaction := message.TransactionTrace{
|
||||||
ID: trace.ID.String(),
|
ID: trace.ID.String(),
|
||||||
BlockNum: block.Block.BlockNumber(),
|
BlockNum: block.Block.BlockNumber(),
|
||||||
Timestamp: block.Block.Timestamp.Time.UTC(),
|
Timestamp: block.Block.Timestamp.UTC(),
|
||||||
Status: trace.Status.String(),
|
Status: trace.Status.String(),
|
||||||
CPUUsageUS: trace.CPUUsageUS,
|
CPUUsageUS: trace.CPUUsageUS,
|
||||||
NetUsage: trace.NetUsage,
|
NetUsage: trace.NetUsage,
|
||||||
|
|
@ -230,7 +230,7 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) {
|
||||||
act := message.ActionTrace{
|
act := message.ActionTrace{
|
||||||
TxID: trace.ID.String(),
|
TxID: trace.ID.String(),
|
||||||
BlockNum: block.Block.BlockNumber(),
|
BlockNum: block.Block.BlockNumber(),
|
||||||
Timestamp: block.Block.Timestamp.Time.UTC(),
|
Timestamp: block.Block.Timestamp.UTC(),
|
||||||
Name: act_trace.Act.Name.String(),
|
Name: act_trace.Act.Name.String(),
|
||||||
Contract: act_trace.Act.Account.String(),
|
Contract: act_trace.Act.Account.String(),
|
||||||
Receiver: act_trace.Receiver.String(),
|
Receiver: act_trace.Receiver.String(),
|
||||||
|
|
@ -331,7 +331,7 @@ func (processor *ShipProcessor) processBlock(block *ship.GetBlocksResultV0) {
|
||||||
|
|
||||||
message := message.TableDelta{
|
message := message.TableDelta{
|
||||||
BlockNum: block.Block.BlockNumber(),
|
BlockNum: block.Block.BlockNumber(),
|
||||||
Timestamp: block.Block.Timestamp.Time.UTC(),
|
Timestamp: block.Block.Timestamp.UTC(),
|
||||||
Name: delta.Name,
|
Name: delta.Name,
|
||||||
Rows: rows,
|
Rows: rows,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue