diff --git a/redis/redis_pubsub.go b/redis/redis_pubsub.go index 8845a27..283e271 100644 --- a/redis/redis_pubsub.go +++ b/redis/redis_pubsub.go @@ -16,111 +16,112 @@ import ( // Rather, published messages are characterized into channels, without knowledge // of what (if any) subscribers there may be. type PubSubSource struct { - ctx context.Context redisClient *redis.Client channel string out chan any } +var _ streams.Source = (*PubSubSource)(nil) + // NewPubSubSource returns a new PubSubSource instance. // // The given redisClient is subscribed to the provided channel. -// The replies to subscription and unsubscribing operations are sent in the form of messages -// so that the client reads a coherent stream of messages where the first element -// indicates the type of message. -func NewPubSubSource(ctx context.Context, redisClient *redis.Client, channel string) (*PubSubSource, error) { +// The replies to subscription and unsubscribing operations are sent in the form +// of messages so that the client reads a coherent stream of messages where the +// first element indicates the type of message. +func NewPubSubSource(ctx context.Context, redisClient *redis.Client, + channel string) (*PubSubSource, error) { pubsub := redisClient.Subscribe(ctx, channel) - // Wait for a confirmation that subscription is created before publishing anything + // wait for a confirmation that subscription is created before + // publishing anything _, err := pubsub.Receive(ctx) if err != nil { return nil, err } source := &PubSubSource{ - ctx: ctx, redisClient: redisClient, channel: channel, out: make(chan any), } + go source.init(ctx, pubsub.Channel()) - go source.init(pubsub.Channel()) return source, nil } -// init starts the main loop -func (ps *PubSubSource) init(ch <-chan *redis.Message) { +func (ps *PubSubSource) init(ctx context.Context, ch <-chan *redis.Message) { loop: for { select { - case <-ps.ctx.Done(): + case <-ctx.Done(): break loop - + // route incoming messages downstream case msg := <-ch: ps.out <- msg } } - - log.Printf("Closing Redis Pub/Sub consumer") + log.Printf("Closing Redis PubSubSource connector") close(ps.out) - ps.redisClient.Close() + if err := ps.redisClient.Close(); err != nil { + log.Printf("Error in Close: %s", err) + } } -// Via streams data through the given flow -func (ps *PubSubSource) Via(_flow streams.Flow) streams.Flow { - flow.DoStream(ps, _flow) - return _flow +// Via streams data to a specified operator and returns it. +func (ps *PubSubSource) Via(operator streams.Flow) streams.Flow { + flow.DoStream(ps, operator) + return operator } -// Out returns an output channel for sending data +// Out returns the output channel of the PubSubSource connector. func (ps *PubSubSource) Out() <-chan any { return ps.out } // PubSubSink represents a Redis Pub/Sub sink connector. type PubSubSink struct { - ctx context.Context redisClient *redis.Client channel string in chan any } +var _ streams.Sink = (*PubSubSink)(nil) + // NewPubSubSink returns a new PubSubSink instance. // -// The incoming messages will be published to the given target channel using the -// provided redis.Client. -func NewPubSubSink(ctx context.Context, redisClient *redis.Client, channel string) *PubSubSink { +// The incoming messages will be published to the given target channel using +// the provided redis.Client. +func NewPubSubSink(ctx context.Context, redisClient *redis.Client, + channel string) *PubSubSink { sink := &PubSubSink{ - ctx: ctx, redisClient: redisClient, channel: channel, in: make(chan any), } + go sink.init(ctx) - go sink.init() return sink } -// init starts the main loop -func (ps *PubSubSink) init() { +func (ps *PubSubSink) init(ctx context.Context) { for msg := range ps.in { - switch m := msg.(type) { + switch message := msg.(type) { case string: - err := ps.redisClient.Publish(ps.ctx, ps.channel, m).Err() - if err != nil { - log.Printf("Error in redisClient.Publish: %s", err) + if err := ps.redisClient.Publish(ctx, ps.channel, message).Err(); err != nil { + log.Printf("Error in Publish: %s", err) } - default: - log.Printf("Unsupported message type %v", m) + log.Printf("Unsupported message type: %T", message) } } - - log.Printf("Closing Redis Pub/Sub producer") - ps.redisClient.Close() + log.Printf("Closing Redis PubSubSink connector") + if err := ps.redisClient.Close(); err != nil { + log.Printf("Error in Close: %s", err) + } } -// In returns an input channel for receiving data +// In returns the input channel of the PubSubSink connector. func (ps *PubSubSink) In() chan<- any { return ps.in } diff --git a/redis/redis_stream.go b/redis/redis_stream.go index 9124855..feba0b1 100644 --- a/redis/redis_stream.go +++ b/redis/redis_stream.go @@ -3,6 +3,7 @@ package redis import ( "context" "log" + "strings" "github.com/redis/go-redis/v9" "github.com/reugn/go-streams" @@ -16,13 +17,14 @@ import ( // append-only log. These include random access in O(1) time and complex // consumption strategies, such as consumer groups. type StreamSource struct { - ctx context.Context redisClient *redis.Client readGroupArgs *redis.XReadGroupArgs groupCreateArgs *XGroupCreateArgs out chan any } +var _ streams.Source = (*StreamSource)(nil) + // XGroupCreateArgs represents the arguments for creating a consumer group. // // Use the special StartID "$" to fetch only the new elements arriving in the stream. @@ -38,9 +40,11 @@ type XGroupCreateArgs struct { // NewStreamSource returns a new StreamSource instance. // Pass in nil for the groupCreateArgs parameter if the consumer group already exists. func NewStreamSource(ctx context.Context, redisClient *redis.Client, - readGroupArgs *redis.XReadGroupArgs, groupCreateArgs *XGroupCreateArgs) (*StreamSource, error) { + readGroupArgs *redis.XReadGroupArgs, groupCreateArgs *XGroupCreateArgs, +) (*StreamSource, error) { if groupCreateArgs != nil { - // Create a new consumer group uniquely identified by for the stream stored at . + // Create a new consumer group uniquely identified by for the stream + // stored at . // By default, the XGROUP CREATE command expects that the target stream exists, // and returns an error when it doesn't. var err error @@ -63,115 +67,116 @@ func NewStreamSource(ctx context.Context, redisClient *redis.Client, } source := &StreamSource{ - ctx: ctx, redisClient: redisClient, readGroupArgs: readGroupArgs, groupCreateArgs: groupCreateArgs, out: make(chan any), } + go source.init(ctx) - go source.init() return source, nil } -// init starts the main loop -func (rs *StreamSource) init() { +func (rs *StreamSource) init(ctx context.Context) { loop: for { select { - case <-rs.ctx.Done(): + case <-ctx.Done(): break loop - default: // The XREADGROUP command is a special version of the XREAD command with // support for consumer groups. - entries, err := rs.redisClient.XReadGroup(rs.ctx, rs.readGroupArgs).Result() + entries, err := rs.redisClient.XReadGroup(ctx, rs.readGroupArgs).Result() if err != nil { - log.Printf("Error in redisClient.XReadGroup: %s", err) + log.Printf("Error in XReadGroup: %s", err) + if strings.HasPrefix(err.Error(), "NOGROUP") { + break loop + } } - - for _, e := range entries { - for _, msg := range e.Messages { + // route incoming messages downstream + for _, stream := range entries { + for _, msg := range stream.Messages { rs.out <- &msg } } } } - - log.Printf("Closing Redis stream consumer") + log.Printf("Closing Redis StreamSource connector") close(rs.out) - rs.redisClient.Close() + if err := rs.redisClient.Close(); err != nil { + log.Printf("Error in Close: %s", err) + } } -// Via streams data through the given flow -func (rs *StreamSource) Via(_flow streams.Flow) streams.Flow { - flow.DoStream(rs, _flow) - return _flow +// Via streams data to a specified operator and returns it. +func (rs *StreamSource) Via(operator streams.Flow) streams.Flow { + flow.DoStream(rs, operator) + return operator } -// Out returns an output channel for sending data +// Out returns the output channel of the StreamSource connector. func (rs *StreamSource) Out() <-chan any { return rs.out } // StreamSink represents a Redis stream sink connector. type StreamSink struct { - ctx context.Context redisClient *redis.Client stream string in chan any } +var _ streams.Sink = (*StreamSink)(nil) + // NewStreamSink returns a new StreamSink instance. // // The incoming messages will be streamed to the given target stream using the // provided redis.Client. -func NewStreamSink(ctx context.Context, redisClient *redis.Client, stream string) *StreamSink { +func NewStreamSink(ctx context.Context, redisClient *redis.Client, + stream string) *StreamSink { sink := &StreamSink{ - ctx: ctx, redisClient: redisClient, stream: stream, in: make(chan any), } + go sink.init(ctx) - go sink.init() return sink } -// init starts the main loop -func (rs *StreamSink) init() { +func (rs *StreamSink) init(ctx context.Context) { for msg := range rs.in { - switch m := msg.(type) { + switch message := msg.(type) { case *redis.XMessage: - rs.xAdd(&redis.XAddArgs{ + rs.xAdd(ctx, &redis.XAddArgs{ Stream: rs.stream, // use the target stream name - Values: m.Values, + Values: message.Values, }) case map[string]any: - rs.xAdd(&redis.XAddArgs{ + rs.xAdd(ctx, &redis.XAddArgs{ Stream: rs.stream, - Values: m, + Values: message, }) default: - log.Printf("Unsupported message type %v", m) + log.Printf("Unsupported message type: %T", message) } } - - log.Printf("Closing Redis stream producer") - rs.redisClient.Close() + log.Printf("Closing Redis StreamSink connector") + if err := rs.redisClient.Close(); err != nil { + log.Printf("Error in Close: %s", err) + } } -// xAdd appends the message to the target stream -func (rs *StreamSink) xAdd(args *redis.XAddArgs) { +// xAdd appends the message to the target stream. +func (rs *StreamSink) xAdd(ctx context.Context, args *redis.XAddArgs) { // Streams are an append-only data structure. The fundamental write // command, called XADD, appends a new entry to the specified stream. - err := rs.redisClient.XAdd(rs.ctx, args).Err() - if err != nil { - log.Printf("Error in redisClient.XAdd: %s", err) + if err := rs.redisClient.XAdd(ctx, args).Err(); err != nil { + log.Printf("Error in XAdd: %s", err) } } -// In returns an input channel for receiving data +// In returns the input channel of the StreamSink connector. func (rs *StreamSink) In() chan<- any { return rs.in }