diff --git a/api/redis/subscriber.go b/api/redis/subscriber.go index 9a8e8fe..1a1c91a 100644 --- a/api/redis/subscriber.go +++ b/api/redis/subscriber.go @@ -11,8 +11,10 @@ import ( ) type Subscriber struct { - sub *redis.PubSub - ctx context.Context + sub *redis.PubSub + ctx context.Context + + // Mutex for channels map. mu sync.RWMutex timeout time.Duration channels map[string]chan []byte @@ -94,11 +96,15 @@ func (s *Subscriber) Read(channel api.Channel) ([]byte, error) { } func (s *Subscriber) Close() error { + // Close redis pubsub. err := s.sub.Close() + // Close all go channels, this will make Read() unblock. for _, ch := range s.channels { close(ch) } + + // Clear the channel map of old channels. s.mu.Lock() s.channels = make(map[string]chan []byte) s.mu.Unlock()