mirror of
https://github.com/eosswedenorg/thalos
synced 2026-07-02 11:43:40 +02:00
api/redis/subscriber.go: adding some comments.
This commit is contained in:
parent
35a9706954
commit
728b03422f
1 changed files with 8 additions and 2 deletions
|
|
@ -11,8 +11,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Subscriber struct {
|
type Subscriber struct {
|
||||||
sub *redis.PubSub
|
sub *redis.PubSub
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
|
// Mutex for channels map.
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
channels map[string]chan []byte
|
channels map[string]chan []byte
|
||||||
|
|
@ -94,11 +96,15 @@ func (s *Subscriber) Read(channel api.Channel) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Subscriber) Close() error {
|
func (s *Subscriber) Close() error {
|
||||||
|
// Close redis pubsub.
|
||||||
err := s.sub.Close()
|
err := s.sub.Close()
|
||||||
|
|
||||||
|
// Close all go channels, this will make Read() unblock.
|
||||||
for _, ch := range s.channels {
|
for _, ch := range s.channels {
|
||||||
close(ch)
|
close(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clear the channel map of old channels.
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
s.channels = make(map[string]chan []byte)
|
s.channels = make(map[string]chan []byte)
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue