mirror of
https://github.com/eosswedenorg/thalos
synced 2026-06-16 04:24:56 +02:00
api/redis/subscriber.go: in worker() no need to spawn a goroutine when sending to the channel.
This commit is contained in:
parent
b854c1dfa7
commit
d789b6a294
1 changed files with 4 additions and 11 deletions
|
|
@ -48,16 +48,6 @@ func NewSubscriber(ctx context.Context, client *redis.Client, ns Namespace, opti
|
|||
return sub
|
||||
}
|
||||
|
||||
// forward forwards a message to the channel.
|
||||
// as writes to a unbuffered channel will block until it's read.
|
||||
// We run select on it and discard the message if no read happends during timeout
|
||||
func forward(msg redis.Message, ch chan<- []byte, timeout time.Duration) {
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
case ch <- []byte(msg.Payload):
|
||||
}
|
||||
}
|
||||
|
||||
// worker reads messages from redis pubsub and forwards them to
|
||||
// correct channels.
|
||||
func (s *Subscriber) worker() {
|
||||
|
|
@ -65,7 +55,10 @@ func (s *Subscriber) worker() {
|
|||
// Route message to correct channel.
|
||||
s.mu.RLock()
|
||||
if ch, ok := s.channels[msg.Channel]; ok {
|
||||
go forward(*msg, ch, s.timeout)
|
||||
select {
|
||||
case <-time.After(s.timeout):
|
||||
case ch <- []byte(msg.Payload):
|
||||
}
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue