diff --git a/api/redis/subscriber.go b/api/redis/subscriber.go index 2387360..07218ba 100644 --- a/api/redis/subscriber.go +++ b/api/redis/subscriber.go @@ -48,16 +48,6 @@ func NewSubscriber(ctx context.Context, client *redis.Client, ns Namespace, opti return sub } -// forward forwards a message to the channel. -// as writes to a unbuffered channel will block until it's read. -// We run select on it and discard the message if no read happends during timeout -func forward(msg redis.Message, ch chan<- []byte, timeout time.Duration) { - select { - case <-time.After(timeout): - case ch <- []byte(msg.Payload): - } -} - // worker reads messages from redis pubsub and forwards them to // correct channels. func (s *Subscriber) worker() { @@ -65,7 +55,10 @@ func (s *Subscriber) worker() { // Route message to correct channel. s.mu.RLock() if ch, ok := s.channels[msg.Channel]; ok { - go forward(*msg, ch, s.timeout) + select { + case <-time.After(s.timeout): + case ch <- []byte(msg.Payload): + } } s.mu.RUnlock() }