Skip to content

Commit

Permalink
refactor(redis): minor connector improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed May 14, 2024
1 parent b235e52 commit 6a27b26
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 81 deletions.
77 changes: 39 additions & 38 deletions redis/redis_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,111 +16,112 @@ import (
// Rather, published messages are characterized into channels, without knowledge
// of what (if any) subscribers there may be.
type PubSubSource struct {
ctx context.Context
redisClient *redis.Client
channel string
out chan any
}

var _ streams.Source = (*PubSubSource)(nil)

// NewPubSubSource returns a new PubSubSource instance.
//
// The given redisClient is subscribed to the provided channel.
// The replies to subscription and unsubscribing operations are sent in the form of messages
// so that the client reads a coherent stream of messages where the first element
// indicates the type of message.
func NewPubSubSource(ctx context.Context, redisClient *redis.Client, channel string) (*PubSubSource, error) {
// The replies to subscription and unsubscribing operations are sent in the form
// of messages so that the client reads a coherent stream of messages where the
// first element indicates the type of message.
func NewPubSubSource(ctx context.Context, redisClient *redis.Client,
channel string) (*PubSubSource, error) {
pubsub := redisClient.Subscribe(ctx, channel)

// Wait for a confirmation that subscription is created before publishing anything
// wait for a confirmation that subscription is created before
// publishing anything
_, err := pubsub.Receive(ctx)
if err != nil {
return nil, err
}

source := &PubSubSource{
ctx: ctx,
redisClient: redisClient,
channel: channel,
out: make(chan any),
}
go source.init(ctx, pubsub.Channel())

go source.init(pubsub.Channel())
return source, nil
}

// init starts the main loop
func (ps *PubSubSource) init(ch <-chan *redis.Message) {
func (ps *PubSubSource) init(ctx context.Context, ch <-chan *redis.Message) {
loop:
for {
select {
case <-ps.ctx.Done():
case <-ctx.Done():
break loop

// route incoming messages downstream
case msg := <-ch:
ps.out <- msg
}
}

log.Printf("Closing Redis Pub/Sub consumer")
log.Printf("Closing Redis PubSubSource connector")
close(ps.out)
ps.redisClient.Close()
if err := ps.redisClient.Close(); err != nil {
log.Printf("Error in Close: %s", err)
}
}

// Via streams data through the given flow
func (ps *PubSubSource) Via(_flow streams.Flow) streams.Flow {
flow.DoStream(ps, _flow)
return _flow
// Via streams data to a specified operator and returns it.
func (ps *PubSubSource) Via(operator streams.Flow) streams.Flow {
flow.DoStream(ps, operator)
return operator
}

// Out returns an output channel for sending data
// Out returns the output channel of the PubSubSource connector.
func (ps *PubSubSource) Out() <-chan any {
return ps.out
}

// PubSubSink represents a Redis Pub/Sub sink connector.
type PubSubSink struct {
ctx context.Context
redisClient *redis.Client
channel string
in chan any
}

var _ streams.Sink = (*PubSubSink)(nil)

// NewPubSubSink returns a new PubSubSink instance.
//
// The incoming messages will be published to the given target channel using the
// provided redis.Client.
func NewPubSubSink(ctx context.Context, redisClient *redis.Client, channel string) *PubSubSink {
// The incoming messages will be published to the given target channel using
// the provided redis.Client.
func NewPubSubSink(ctx context.Context, redisClient *redis.Client,
channel string) *PubSubSink {
sink := &PubSubSink{
ctx: ctx,
redisClient: redisClient,
channel: channel,
in: make(chan any),
}
go sink.init(ctx)

go sink.init()
return sink
}

// init starts the main loop
func (ps *PubSubSink) init() {
func (ps *PubSubSink) init(ctx context.Context) {
for msg := range ps.in {
switch m := msg.(type) {
switch message := msg.(type) {
case string:
err := ps.redisClient.Publish(ps.ctx, ps.channel, m).Err()
if err != nil {
log.Printf("Error in redisClient.Publish: %s", err)
if err := ps.redisClient.Publish(ctx, ps.channel, message).Err(); err != nil {
log.Printf("Error in Publish: %s", err)
}

default:
log.Printf("Unsupported message type %v", m)
log.Printf("Unsupported message type: %T", message)
}
}

log.Printf("Closing Redis Pub/Sub producer")
ps.redisClient.Close()
log.Printf("Closing Redis PubSubSink connector")
if err := ps.redisClient.Close(); err != nil {
log.Printf("Error in Close: %s", err)
}
}

// In returns an input channel for receiving data
// In returns the input channel of the PubSubSink connector.
func (ps *PubSubSink) In() chan<- any {
return ps.in
}
91 changes: 48 additions & 43 deletions redis/redis_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package redis
import (
"context"
"log"
"strings"

"github.com/redis/go-redis/v9"
"github.com/reugn/go-streams"
Expand All @@ -16,13 +17,14 @@ import (
// append-only log. These include random access in O(1) time and complex
// consumption strategies, such as consumer groups.
type StreamSource struct {
ctx context.Context
redisClient *redis.Client
readGroupArgs *redis.XReadGroupArgs
groupCreateArgs *XGroupCreateArgs
out chan any
}

var _ streams.Source = (*StreamSource)(nil)

// XGroupCreateArgs represents the arguments for creating a consumer group.
//
// Use the special StartID "$" to fetch only the new elements arriving in the stream.
Expand All @@ -38,9 +40,11 @@ type XGroupCreateArgs struct {
// NewStreamSource returns a new StreamSource instance.
// Pass in nil for the groupCreateArgs parameter if the consumer group already exists.
func NewStreamSource(ctx context.Context, redisClient *redis.Client,
readGroupArgs *redis.XReadGroupArgs, groupCreateArgs *XGroupCreateArgs) (*StreamSource, error) {
readGroupArgs *redis.XReadGroupArgs, groupCreateArgs *XGroupCreateArgs,
) (*StreamSource, error) {
if groupCreateArgs != nil {
// Create a new consumer group uniquely identified by <group> for the stream stored at <stream>.
// Create a new consumer group uniquely identified by <group> for the stream
// stored at <stream>.
// By default, the XGROUP CREATE command expects that the target stream exists,
// and returns an error when it doesn't.
var err error
Expand All @@ -63,115 +67,116 @@ func NewStreamSource(ctx context.Context, redisClient *redis.Client,
}

source := &StreamSource{
ctx: ctx,
redisClient: redisClient,
readGroupArgs: readGroupArgs,
groupCreateArgs: groupCreateArgs,
out: make(chan any),
}
go source.init(ctx)

go source.init()
return source, nil
}

// init starts the main loop
func (rs *StreamSource) init() {
func (rs *StreamSource) init(ctx context.Context) {
loop:
for {
select {
case <-rs.ctx.Done():
case <-ctx.Done():
break loop

default:
// The XREADGROUP command is a special version of the XREAD command with
// support for consumer groups.
entries, err := rs.redisClient.XReadGroup(rs.ctx, rs.readGroupArgs).Result()
entries, err := rs.redisClient.XReadGroup(ctx, rs.readGroupArgs).Result()
if err != nil {
log.Printf("Error in redisClient.XReadGroup: %s", err)
log.Printf("Error in XReadGroup: %s", err)
if strings.HasPrefix(err.Error(), "NOGROUP") {
break loop
}
}

for _, e := range entries {
for _, msg := range e.Messages {
// route incoming messages downstream
for _, stream := range entries {
for _, msg := range stream.Messages {
rs.out <- &msg
}
}
}
}

log.Printf("Closing Redis stream consumer")
log.Printf("Closing Redis StreamSource connector")
close(rs.out)
rs.redisClient.Close()
if err := rs.redisClient.Close(); err != nil {
log.Printf("Error in Close: %s", err)
}
}

// Via streams data through the given flow
func (rs *StreamSource) Via(_flow streams.Flow) streams.Flow {
flow.DoStream(rs, _flow)
return _flow
// Via streams data to a specified operator and returns it.
func (rs *StreamSource) Via(operator streams.Flow) streams.Flow {
flow.DoStream(rs, operator)
return operator
}

// Out returns an output channel for sending data
// Out returns the output channel of the StreamSource connector.
func (rs *StreamSource) Out() <-chan any {
return rs.out
}

// StreamSink represents a Redis stream sink connector.
type StreamSink struct {
ctx context.Context
redisClient *redis.Client
stream string
in chan any
}

var _ streams.Sink = (*StreamSink)(nil)

// NewStreamSink returns a new StreamSink instance.
//
// The incoming messages will be streamed to the given target stream using the
// provided redis.Client.
func NewStreamSink(ctx context.Context, redisClient *redis.Client, stream string) *StreamSink {
func NewStreamSink(ctx context.Context, redisClient *redis.Client,
stream string) *StreamSink {
sink := &StreamSink{
ctx: ctx,
redisClient: redisClient,
stream: stream,
in: make(chan any),
}
go sink.init(ctx)

go sink.init()
return sink
}

// init starts the main loop
func (rs *StreamSink) init() {
func (rs *StreamSink) init(ctx context.Context) {
for msg := range rs.in {
switch m := msg.(type) {
switch message := msg.(type) {
case *redis.XMessage:
rs.xAdd(&redis.XAddArgs{
rs.xAdd(ctx, &redis.XAddArgs{
Stream: rs.stream, // use the target stream name
Values: m.Values,
Values: message.Values,
})
case map[string]any:
rs.xAdd(&redis.XAddArgs{
rs.xAdd(ctx, &redis.XAddArgs{
Stream: rs.stream,
Values: m,
Values: message,
})
default:
log.Printf("Unsupported message type %v", m)
log.Printf("Unsupported message type: %T", message)
}
}

log.Printf("Closing Redis stream producer")
rs.redisClient.Close()
log.Printf("Closing Redis StreamSink connector")
if err := rs.redisClient.Close(); err != nil {
log.Printf("Error in Close: %s", err)
}
}

// xAdd appends the message to the target stream
func (rs *StreamSink) xAdd(args *redis.XAddArgs) {
// xAdd appends the message to the target stream.
func (rs *StreamSink) xAdd(ctx context.Context, args *redis.XAddArgs) {
// Streams are an append-only data structure. The fundamental write
// command, called XADD, appends a new entry to the specified stream.
err := rs.redisClient.XAdd(rs.ctx, args).Err()
if err != nil {
log.Printf("Error in redisClient.XAdd: %s", err)
if err := rs.redisClient.XAdd(ctx, args).Err(); err != nil {
log.Printf("Error in XAdd: %s", err)
}
}

// In returns an input channel for receiving data
// In returns the input channel of the StreamSink connector.
func (rs *StreamSink) In() chan<- any {
return rs.in
}

0 comments on commit 6a27b26

Please sign in to comment.