Skip to content

Commit

Permalink
implement Redis stream connector (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Aug 19, 2023
1 parent f53d032 commit b587a8c
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 3 deletions.
3 changes: 2 additions & 1 deletion examples/redis/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
redis
stream/stream
pubsub/pubsub
2 changes: 1 addition & 1 deletion examples/redis/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ version: '3'
services:
redis:
image: redis
container_name: pubsub
container_name: redis-streams
ports:
- 6379:6379
68 changes: 68 additions & 0 deletions examples/redis/stream/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package main

import (
"context"
"fmt"
"log"
"strings"
"time"

rs "github.com/reugn/go-streams/redis"

"github.com/redis/go-redis/v9"
"github.com/reugn/go-streams/flow"
)

// XADD stream1 * key1 a key2 b key3 c
// XLEN stream2
// XREAD COUNT 1 BLOCK 100 STREAMS stream2 0
func main() {
ctx, cancelFunc := context.WithCancel(context.Background())

timer := time.NewTimer(time.Minute * 30)
go func() {
<-timer.C
cancelFunc()
}()

config := &redis.Options{
Addr: "localhost:6379", // use default Addr
Password: "", // no password set
DB: 0, // use default DB
}

redisClient := redis.NewClient(config)

readGroupArgs := &redis.XReadGroupArgs{
Group: "group1",
Consumer: "consumer1",
Streams: []string{"stream1", ">"},
}
// groupCreateArgs := &rs.XGroupCreateArgs{
// Stream: "stream1",
// Group: "group1",
// StartID: "$",
// MkStream: true,
// }
source, err := rs.NewStreamSource(ctx, redisClient, readGroupArgs, nil)
if err != nil {
log.Fatal(err)
}

toUpperMapFlow := flow.NewMap(toUpper, 1)
sink := rs.NewStreamSink(ctx, redisClient, "stream2")

source.
Via(toUpperMapFlow).
To(sink)
}

var toUpper = func(msg *redis.XMessage) *redis.XMessage {
fmt.Printf("Got: %v\n", msg.Values)
values := make(map[string]interface{}, len(msg.Values))
for key, element := range msg.Values {
values[key] = strings.ToUpper(fmt.Sprintf("%v", element))
}
msg.Values = values
return msg
}
2 changes: 1 addition & 1 deletion redis/doc.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
// Package redis implements the Redis Pub/Sub connector.
// Package redis implements the Redis streaming connectors.
package redis
177 changes: 177 additions & 0 deletions redis/redis_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package redis

import (
"context"
"log"

"github.com/redis/go-redis/v9"
"github.com/reugn/go-streams"
"github.com/reugn/go-streams/flow"
)

// StreamSource represents a Redis stream source connector.
//
// A Redis stream is a data structure that acts like an append-only log but
// also implements several operations to overcome some of the limits of a typical
// append-only log. These include random access in O(1) time and complex
// consumption strategies, such as consumer groups.
type StreamSource struct {
ctx context.Context
redisClient *redis.Client
readGroupArgs *redis.XReadGroupArgs
groupCreateArgs *XGroupCreateArgs
out chan interface{}
}

// XGroupCreateArgs represents the arguments for creating a consumer group.
//
// Use the special StartID "$" to fetch only the new elements arriving in the stream.
// If instead you want the consumer to fetch the whole stream history,
// use zero ("0") as the starting ID for the consumer group.
type XGroupCreateArgs struct {
Stream string
Group string
StartID string
MkStream bool // set to true to create an empty stream automatically
}

// NewStreamSource returns a new StreamSource instance.
// Pass in nil for the groupCreateArgs parameter if the consumer group already exists.
func NewStreamSource(ctx context.Context, redisClient *redis.Client,
readGroupArgs *redis.XReadGroupArgs, groupCreateArgs *XGroupCreateArgs) (*StreamSource, error) {
if groupCreateArgs != nil {
// Create a new consumer group uniquely identified by <group> for the stream stored at <stream>.
// By default, the XGROUP CREATE command expects that the target stream exists,
// and returns an error when it doesn't.
var err error
if groupCreateArgs.MkStream {
err = redisClient.XGroupCreateMkStream(
ctx,
groupCreateArgs.Stream,
groupCreateArgs.Group,
groupCreateArgs.StartID).Err()
} else {
err = redisClient.XGroupCreate(
ctx,
groupCreateArgs.Stream,
groupCreateArgs.Group,
groupCreateArgs.StartID).Err()
}
if err != nil {
return nil, err
}
}

source := &StreamSource{
ctx: ctx,
redisClient: redisClient,
readGroupArgs: readGroupArgs,
groupCreateArgs: groupCreateArgs,
out: make(chan interface{}),
}

go source.init()
return source, nil
}

// init starts the main loop
func (rs *StreamSource) init() {
loop:
for {
select {
case <-rs.ctx.Done():
break loop

default:
// The XREADGROUP command is a special version of the XREAD command with
// support for consumer groups.
entries, err := rs.redisClient.XReadGroup(rs.ctx, rs.readGroupArgs).Result()
if err != nil {
log.Printf("Error in redisClient.XReadGroup: %s", err)
}

for _, e := range entries {
for _, msg := range e.Messages {
rs.out <- &msg
}
}
}
}

log.Printf("Closing Redis stream consumer")
close(rs.out)
rs.redisClient.Close()
}

// Via streams data through the given flow
func (rs *StreamSource) Via(_flow streams.Flow) streams.Flow {
flow.DoStream(rs, _flow)
return _flow
}

// Out returns an output channel for sending data
func (rs *StreamSource) Out() <-chan interface{} {
return rs.out
}

// StreamSink represents a Redis stream sink connector.
type StreamSink struct {
ctx context.Context
redisClient *redis.Client
stream string
in chan interface{}
}

// NewStreamSink returns a new StreamSink instance.
//
// The incoming messages will be streamed to the given target stream using the
// provided redis.Client.
func NewStreamSink(ctx context.Context, redisClient *redis.Client, stream string) *StreamSink {
sink := &StreamSink{
ctx: ctx,
redisClient: redisClient,
stream: stream,
in: make(chan interface{}),
}

go sink.init()
return sink
}

// init starts the main loop
func (rs *StreamSink) init() {
for msg := range rs.in {
switch m := msg.(type) {
case *redis.XMessage:
rs.xAdd(&redis.XAddArgs{
Stream: rs.stream, // use the target stream name
Values: m.Values,
})
case map[string]interface{}:
rs.xAdd(&redis.XAddArgs{
Stream: rs.stream,
Values: m,
})
default:
log.Printf("Unsupported message type %v", m)
}
}

log.Printf("Closing Redis stream producer")
rs.redisClient.Close()
}

// xAdd appends the message to the target stream
func (rs *StreamSink) xAdd(args *redis.XAddArgs) {
// Streams are an append-only data structure. The fundamental write
// command, called XADD, appends a new entry to the specified stream.
err := rs.redisClient.XAdd(rs.ctx, args).Err()
if err != nil {
log.Printf("Error in redisClient.XAdd: %s", err)
}
}

// In returns an input channel for receiving data
func (rs *StreamSink) In() chan<- interface{} {
return rs.in
}

0 comments on commit b587a8c

Please sign in to comment.