mirror of
https://github.com/eosswedenorg/thalos
synced 2026-06-16 04:24:56 +02:00
typo fixes and added comments
This commit is contained in:
parent
48ca5e4f92
commit
c769b6a700
7 changed files with 11 additions and 9 deletions
|
|
@ -11,7 +11,7 @@ import (
|
|||
|
||||
type handler func([]byte)
|
||||
|
||||
// Client reads and decodes messages from a reader and posts thems to a go channel
|
||||
// Client reads and decodes messages from a reader and posts them to a go channel
|
||||
type Client struct {
|
||||
reader Reader
|
||||
decoder message.Decoder
|
||||
|
|
@ -35,6 +35,7 @@ func (c *Client) Channel() <-chan any {
|
|||
return c.channel
|
||||
}
|
||||
|
||||
// Helper method to post a message to a channel with timeout.
|
||||
func (c *Client) post(msg any) {
|
||||
select {
|
||||
case <-time.After(time.Second):
|
||||
|
|
@ -46,7 +47,7 @@ func (c *Client) worker(channel Channel, h handler) {
|
|||
for {
|
||||
payload, err := c.reader.Read(channel)
|
||||
if err != nil {
|
||||
// Dont report EOF as an error because it is used
|
||||
// Don't report EOF as an error because it is used
|
||||
// by readers to signal an graceful end of input.
|
||||
if err != io.EOF {
|
||||
c.post(err)
|
||||
|
|
@ -59,7 +60,7 @@ func (c *Client) worker(channel Channel, h handler) {
|
|||
}
|
||||
|
||||
// Helper method to decode a message and post and error on the channel if it fails.
|
||||
// Returns true if successfull. false otherwise
|
||||
// Returns true if successful. False otherwise
|
||||
func (c *Client) decode(payload []byte, msg any) bool {
|
||||
if err := c.decoder(payload, msg); err != nil {
|
||||
c.post(err)
|
||||
|
|
@ -152,7 +153,7 @@ func (c *Client) Run() {
|
|||
|
||||
func (c *Client) Close() error {
|
||||
err := c.reader.Close()
|
||||
// Wait for all goroutines before closing channel.
|
||||
// Wait for all goroutines to finish before closing channel.
|
||||
c.wg.Wait()
|
||||
close(c.channel)
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ func createCodec() message.Codec {
|
|||
handle.MapType = reflect.TypeOf(map[string]any(nil))
|
||||
handle.Canonical = true
|
||||
|
||||
// Wierd name but this is needed for the newest version of msgpack
|
||||
// Weird name but this is needed for the newest version of msgpack
|
||||
// that has support for time and string datatypes etc.
|
||||
handle.WriteExt = true
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ package api
|
|||
// This is a low-level interface typically implemented by backend drivers
|
||||
type Reader interface {
|
||||
// Read a message from a channel.
|
||||
// Read may block until a message is ready or an error occured.
|
||||
// Read may block until a message is ready or an error occurred.
|
||||
//
|
||||
// io.EOF is returned from a reader when there is no more data to be read.
|
||||
// If Read returns io.EOF all subsequent calls must also return io.EOF
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ const (
|
|||
//
|
||||
// Contains a prefix and chain_id to guard keys against collision.
|
||||
// Prefix should be sufficient to not collide with other application using the same redis database.
|
||||
// chain_id should be ok to not let multiple reader with different chains to write to the same channels.
|
||||
// chain_id should be fine to not let multiple reader with different chains to write to the same channels.
|
||||
|
||||
type Namespace struct {
|
||||
Prefix string
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ func NewSubscriber(ctx context.Context, client *redis.Client, ns Namespace, opti
|
|||
return sub
|
||||
}
|
||||
|
||||
// worker reads messages from redis pubsub and forwards them to
|
||||
// worker reads messages from Redis pubsub and forwards them to
|
||||
// correct channels.
|
||||
func (s *Subscriber) worker() {
|
||||
for msg := range s.sub.Channel() {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue