1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-06-18 04:40:03 +02:00
thalos/transport/redis_stream.go
2022-11-28 15:22:39 +01:00

57 lines
1.4 KiB
Go

package transport
import (
"fmt"
"strings"
"eosio-ship-trace-reader/redis"
)
type RedisStream struct {
name string
// Length of the stream, if items are added when the stream is full, old items will be evicted
// until the stream's length is equal to this value.
length int64
// map of namespaces and their indexes.
// each namespace is it's own stream.
indexes map[string]uint32
}
func NewRedisStream(name string, length int64) (RedisStream) {
return RedisStream{
name: name,
length: length,
indexes: make(map[string]uint32),
}
}
func (this RedisStream) Send(namespace string, id uint32, message interface{}) error {
stream := strings.Join([]string{"ship.stream", this.name, namespace}, ".")
index := this.nextIndex(namespace)
data := map[string]interface{}{
"block": id,
"data": message,
}
if err := redis.XAdd(stream, fmt.Sprintf("%d-%d", id, index), this.length, data).Err(); err != nil {
return fmt.Errorf("Failed to add to redis stream '%s': %s", stream, err)
}
return nil
}
func (this RedisStream) Commit() error {
// reset indexes on flush.
this.indexes = make(map[string]uint32)
return nil
}
func (this RedisStream) nextIndex(namespace string) uint32 {
idx := this.indexes[namespace]
this.indexes[namespace] = idx + 1
return idx
}