1
0
Fork 0
mirror of https://github.com/eosswedenorg/thalos synced 2026-07-04 12:03:41 +02:00

api/redis/subscriber.go: in Read() return io.EOF if redis connection is closed or channel returns a empty byte slice.

This commit is contained in:
Henrik Hautakoski 2024-02-07 17:26:24 +01:00
parent 816d405d31
commit b854c1dfa7

View file

@ -2,6 +2,7 @@ package redis
import ( import (
"context" "context"
"io"
"sync" "sync"
"time" "time"
@ -82,6 +83,10 @@ func (s *Subscriber) Read(channel api.Channel) ([]byte, error) {
// Subscribe and insert it. // Subscribe and insert it.
err = s.sub.Subscribe(s.ctx, key) err = s.sub.Subscribe(s.ctx, key)
if err != nil { if err != nil {
// Closed redis client is considered an EOF.
if err == redis.ErrClosed {
err = io.EOF
}
return nil, err return nil, err
} }
@ -92,7 +97,12 @@ func (s *Subscriber) Read(channel api.Channel) ([]byte, error) {
s.mu.Unlock() s.mu.Unlock()
} }
return <-ch, nil data := <-ch
// Zero length data is considered an EOF
if len(data) == 0 {
return nil, io.EOF
}
return data, nil
} }
func (s *Subscriber) Close() error { func (s *Subscriber) Close() error {