mirror of
https://github.com/eosswedenorg/thalos
synced 2026-06-19 04:50:02 +02:00
tools: cleanup and refactor to make code more readable.
This commit is contained in:
parent
ea5b2b8fc2
commit
8b8867394f
6 changed files with 341 additions and 328 deletions
|
|
@ -18,101 +18,108 @@ import (
|
|||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var benchCmd = &cobra.Command{
|
||||
Use: "bench",
|
||||
Short: "Run a benchmark against a thalos node",
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
counter := 0
|
||||
interval, _ := cmd.Flags().GetDuration("interval")
|
||||
func CreateBenchCmd() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "bench",
|
||||
Short: "Run a benchmark against a thalos node",
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
counter := 0
|
||||
interval, _ := cmd.Flags().GetDuration("interval")
|
||||
|
||||
url, _ := cmd.Flags().GetString("redis-url")
|
||||
user, _ := cmd.Flags().GetString("redis-user")
|
||||
pw, _ := cmd.Flags().GetString("redis-pw")
|
||||
prefix, _ := cmd.Flags().GetString("prefix")
|
||||
chain_id, _ := cmd.Flags().GetString("chain_id")
|
||||
db, _ := cmd.Flags().GetInt("redis-db")
|
||||
url, _ := cmd.Flags().GetString("redis-url")
|
||||
user, _ := cmd.Flags().GetString("redis-user")
|
||||
pw, _ := cmd.Flags().GetString("redis-pw")
|
||||
prefix, _ := cmd.Flags().GetString("prefix")
|
||||
chain_id, _ := cmd.Flags().GetString("chain_id")
|
||||
db, _ := cmd.Flags().GetInt("redis-db")
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"url": url,
|
||||
"prefix": prefix,
|
||||
"chain_id": chain_id,
|
||||
"database": db,
|
||||
}).Info("Connecting to redis")
|
||||
log.WithFields(log.Fields{
|
||||
"url": url,
|
||||
"prefix": prefix,
|
||||
"chain_id": chain_id,
|
||||
"database": db,
|
||||
}).Info("Connecting to redis")
|
||||
|
||||
// Create redis client
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: url,
|
||||
Username: user,
|
||||
Password: pw,
|
||||
DB: db,
|
||||
})
|
||||
// Create redis client
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: url,
|
||||
Username: user,
|
||||
Password: pw,
|
||||
DB: db,
|
||||
})
|
||||
|
||||
if err := rdb.Ping(context.Background()).Err(); err != nil {
|
||||
log.WithError(err).Fatal("Failed to connect to redis")
|
||||
return
|
||||
}
|
||||
if err := rdb.Ping(context.Background()).Err(); err != nil {
|
||||
log.WithError(err).Fatal("Failed to connect to redis")
|
||||
return
|
||||
}
|
||||
|
||||
log.Println("Connected to redis")
|
||||
log.Println("Connected to redis")
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"interval": interval,
|
||||
}).Info("Starting benchmark")
|
||||
log.WithFields(log.Fields{
|
||||
"interval": interval,
|
||||
}).Info("Starting benchmark")
|
||||
|
||||
sub := api_redis.NewSubscriber(context.Background(), rdb, api_redis.Namespace{
|
||||
Prefix: prefix,
|
||||
ChainID: chain_id,
|
||||
})
|
||||
sub := api_redis.NewSubscriber(context.Background(), rdb, api_redis.Namespace{
|
||||
Prefix: prefix,
|
||||
ChainID: chain_id,
|
||||
})
|
||||
|
||||
codec, err := message.GetCodec("json")
|
||||
if err != nil {
|
||||
log.WithError(err).Fatal("Failed to get codec")
|
||||
return
|
||||
}
|
||||
codec, err := message.GetCodec("json")
|
||||
if err != nil {
|
||||
log.WithError(err).Fatal("Failed to get codec")
|
||||
return
|
||||
}
|
||||
|
||||
client := api.NewClient(sub, codec.Decoder)
|
||||
client := api.NewClient(sub, codec.Decoder)
|
||||
|
||||
// Subscribe to all actions
|
||||
if err = client.Subscribe(api.ActionChannel{}.Channel()); err != nil {
|
||||
log.WithError(err).Fatal("Failed to subscribe to channels")
|
||||
return
|
||||
}
|
||||
// Subscribe to all actions
|
||||
if err = client.Subscribe(api.ActionChannel{}.Channel()); err != nil {
|
||||
log.WithError(err).Fatal("Failed to subscribe to channels")
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
for t := range client.Channel() {
|
||||
switch err := t.(type) {
|
||||
case message.ActionTrace:
|
||||
counter++
|
||||
case error:
|
||||
log.WithError(err).Error("Error when reading stream")
|
||||
go func() {
|
||||
for t := range client.Channel() {
|
||||
switch err := t.(type) {
|
||||
case message.ActionTrace:
|
||||
counter++
|
||||
case error:
|
||||
log.WithError(err).Error("Error when reading stream")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
t := time.Now()
|
||||
sig := make(chan os.Signal, 1)
|
||||
signal.Notify(sig, os.Interrupt)
|
||||
|
||||
// Read stuff.
|
||||
for {
|
||||
select {
|
||||
case <-sig:
|
||||
fmt.Println("Got interrupt")
|
||||
client.Close()
|
||||
return
|
||||
case now := <-time.After(interval):
|
||||
elapsed := now.Sub(t)
|
||||
t = now
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"num_messages": counter,
|
||||
"elapsed": elapsed,
|
||||
"msg_per_sec": float64(counter) / elapsed.Seconds(),
|
||||
"msg_per_ms": float64(counter) / float64(elapsed.Milliseconds()),
|
||||
"msg_per_min": float64(counter) / elapsed.Minutes(),
|
||||
}).Info("Benchmark results")
|
||||
|
||||
counter = 0
|
||||
}
|
||||
}
|
||||
}()
|
||||
},
|
||||
}
|
||||
|
||||
t := time.Now()
|
||||
sig := make(chan os.Signal, 1)
|
||||
signal.Notify(sig, os.Interrupt)
|
||||
cmd.Flags().AddFlagSet(RedisFlags())
|
||||
cmd.Flags().DurationP("interval", "i", time.Minute, "How often the benchmark results should be displayed.")
|
||||
|
||||
// Read stuff.
|
||||
for {
|
||||
select {
|
||||
case <-sig:
|
||||
fmt.Println("Got interrupt")
|
||||
client.Close()
|
||||
return
|
||||
case now := <-time.After(interval):
|
||||
elapsed := now.Sub(t)
|
||||
t = now
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"num_messages": counter,
|
||||
"elapsed": elapsed,
|
||||
"msg_per_sec": float64(counter) / elapsed.Seconds(),
|
||||
"msg_per_ms": float64(counter) / float64(elapsed.Milliseconds()),
|
||||
"msg_per_min": float64(counter) / elapsed.Minutes(),
|
||||
}).Info("Benchmark results")
|
||||
|
||||
counter = 0
|
||||
}
|
||||
}
|
||||
},
|
||||
return cmd
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue