diff --git a/api/redis/subscriber.go b/api/redis/subscriber.go index 1a1c91a..2387360 100644 --- a/api/redis/subscriber.go +++ b/api/redis/subscriber.go @@ -2,6 +2,7 @@ package redis import ( "context" + "io" "sync" "time" @@ -82,6 +83,10 @@ func (s *Subscriber) Read(channel api.Channel) ([]byte, error) { // Subscribe and insert it. err = s.sub.Subscribe(s.ctx, key) if err != nil { + // Closed redis client is considered an EOF. + if err == redis.ErrClosed { + err = io.EOF + } return nil, err } @@ -92,7 +97,12 @@ func (s *Subscriber) Read(channel api.Channel) ([]byte, error) { s.mu.Unlock() } - return <-ch, nil + data := <-ch + // Zero length data is considered an EOF + if len(data) == 0 { + return nil, io.EOF + } + return data, nil } func (s *Subscriber) Close() error {