Skip to content

Commit

Permalink
feat(connectors)!: remove signal.Notify from connectors (#131)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored May 13, 2024
1 parent 1a0fc9d commit b235e52
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 212 deletions.
149 changes: 62 additions & 87 deletions kafka/kafka_sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@ package kafka
import (
"context"
"log"
"os"
"os/signal"
"sync"
"syscall"

"github.com/IBM/sarama"
"github.com/reugn/go-streams"
Expand All @@ -15,16 +11,15 @@ import (

// KafkaSource represents an Apache Kafka source connector.
type KafkaSource struct {
consumer sarama.ConsumerGroup
handler sarama.ConsumerGroupHandler
topics []string
out chan any
ctx context.Context
cancelCtx context.CancelFunc
wg *sync.WaitGroup
consumer sarama.ConsumerGroup
handler sarama.ConsumerGroupHandler
topics []string
out chan any
}

// NewKafkaSource returns a new KafkaSource instance.
var _ streams.Source = (*KafkaSource)(nil)

// NewKafkaSource returns a new KafkaSource connector.
func NewKafkaSource(ctx context.Context, addrs []string, groupID string,
config *sarama.Config, topics ...string) (*KafkaSource, error) {
consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config)
Expand All @@ -33,103 +28,84 @@ func NewKafkaSource(ctx context.Context, addrs []string, groupID string,
}

out := make(chan any)
cctx, cancel := context.WithCancel(ctx)

sink := &KafkaSource{
consumer: consumerGroup,
handler: &GroupHandler{make(chan struct{}), out},
topics: topics,
out: out,
ctx: cctx,
cancelCtx: cancel,
wg: &sync.WaitGroup{},
source := &KafkaSource{
consumer: consumerGroup,
handler: &GroupHandler{make(chan struct{}), out},
topics: topics,
out: out,
}
go source.init(ctx)

go sink.init()
return sink, nil
return source, nil
}

func (ks *KafkaSource) claimLoop() {
ks.wg.Add(1)
defer func() {
ks.wg.Done()
log.Printf("Exiting Kafka claimLoop")
}()
func (ks *KafkaSource) init(ctx context.Context) {
loop:
for {
handler := ks.handler.(*GroupHandler)
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := ks.consumer.Consume(ks.ctx, ks.topics, handler); err != nil {
log.Printf("Kafka consumer.Consume failed with: %v", err)
// Consume is called inside an infinite loop, so that when a
// server-side rebalance happens, the consumer session will be
// recreated to get the new claims.
if err := ks.consumer.Consume(ctx, ks.topics, handler); err != nil {
log.Printf("Error is Consume: %s", err)
}
handler.ready = make(chan struct{})

select {
case <-ks.ctx.Done():
return
case <-ctx.Done():
break loop
default:
}

handler.ready = make(chan struct{})
}
}

// init starts the main loop
func (ks *KafkaSource) init() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
go ks.claimLoop()

select {
case <-sigchan:
ks.cancelCtx()
case <-ks.ctx.Done():
}

log.Printf("Closing Kafka consumer")
ks.wg.Wait()
log.Printf("Closing Kafka source connector")
close(ks.out)
ks.consumer.Close()
if err := ks.consumer.Close(); err != nil {
log.Printf("Error in Close: %s", err)
}
}

// Via streams data through the given flow
func (ks *KafkaSource) Via(_flow streams.Flow) streams.Flow {
flow.DoStream(ks, _flow)
return _flow
// Via streams data to a specified operator and returns it.
func (ks *KafkaSource) Via(operator streams.Flow) streams.Flow {
flow.DoStream(ks, operator)
return operator
}

// Out returns an output channel for sending data
// Out returns the output channel of the KafkaSource connector.
func (ks *KafkaSource) Out() <-chan any {
return ks.out
}

// GroupHandler represents a Sarama consumer group handler
// GroupHandler implements the [sarama.ConsumerGroupHandler] interface.
// ConsumerGroupHandler instances are used to handle individual topic/partition claims.
// It also provides hooks for the consumer group session life-cycle and
// allows for triggering logic before or after the consume loop(s).
type GroupHandler struct {
ready chan struct{}
out chan any
}

// Setup is run at the beginning of a new session, before ConsumeClaim
// Setup is run at the beginning of a new session, before ConsumeClaim.
func (handler *GroupHandler) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
// mark the consumer as ready
close(handler.ready)
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited.
func (handler *GroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (handler *GroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
func (handler *GroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
for {
select {
case message := <-claim.Messages():
if message != nil {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s",
string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
session.MarkMessage(message, "") // mark the message as consumed
handler.out <- message
}

Expand All @@ -146,7 +122,9 @@ type KafkaSink struct {
in chan any
}

// NewKafkaSink returns a new KafkaSink instance.
var _ streams.Sink = (*KafkaSink)(nil)

// NewKafkaSink returns a new KafkaSink connector.
func NewKafkaSink(addrs []string, config *sarama.Config, topic string) (*KafkaSink, error) {
producer, err := sarama.NewSyncProducer(addrs, config)
if err != nil {
Expand All @@ -158,48 +136,45 @@ func NewKafkaSink(addrs []string, config *sarama.Config, topic string) (*KafkaSi
topic: topic,
in: make(chan any),
}

go sink.init()

return sink, nil
}

// init starts the main loop
func (ks *KafkaSink) init() {
for msg := range ks.in {
var err error
switch m := msg.(type) {
switch message := msg.(type) {
case *sarama.ProducerMessage:
_, _, err = ks.producer.SendMessage(m)

_, _, err = ks.producer.SendMessage(message)
case *sarama.ConsumerMessage:
sMsg := &sarama.ProducerMessage{
producerMessage := &sarama.ProducerMessage{
Topic: ks.topic,
Key: sarama.StringEncoder(m.Key),
Value: sarama.StringEncoder(m.Value),
Key: sarama.StringEncoder(message.Key),
Value: sarama.StringEncoder(message.Value),
}
_, _, err = ks.producer.SendMessage(sMsg)

_, _, err = ks.producer.SendMessage(producerMessage)
case string:
sMsg := &sarama.ProducerMessage{
producerMessage := &sarama.ProducerMessage{
Topic: ks.topic,
Value: sarama.StringEncoder(m),
Value: sarama.StringEncoder(message),
}
_, _, err = ks.producer.SendMessage(sMsg)

_, _, err = ks.producer.SendMessage(producerMessage)
default:
log.Printf("Unsupported message type %v", m)
log.Printf("Unsupported message type: %T", message)
}

if err != nil {
log.Printf("Error processing Kafka message: %s", err)
}
}

log.Printf("Closing Kafka producer")
ks.producer.Close()
log.Printf("Closing Kafka sink connector")
if err := ks.producer.Close(); err != nil {
log.Printf("Error in Close: %s", err)
}
}

// In returns an input channel for receiving data
// In returns the input channel of the KafkaSink connector.
func (ks *KafkaSink) In() chan<- any {
return ks.in
}
13 changes: 8 additions & 5 deletions nats/nats_jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (config *JetStreamSourceConfig) validate() error {
return nil
}

// NewJetStreamSourceConfig returns a new JetStreamSourceConfig with default values.
// NewJetStreamSourceConfig returns a new [JetStreamSourceConfig] with default values.
func NewJetStreamSourceConfig(conn *nats.Conn, jetStreamContext nats.JetStreamContext,
subject string) *JetStreamSourceConfig {
return &JetStreamSourceConfig{
Expand All @@ -72,6 +72,8 @@ type JetStreamSource struct {
out chan any
}

var _ streams.Source = (*JetStreamSource)(nil)

// NewJetStreamSource returns a new JetStreamSource connector.
// A pull-based subscription is used to consume data from the subject.
func NewJetStreamSource(ctx context.Context, config *JetStreamSourceConfig) (*JetStreamSource, error) {
Expand All @@ -90,8 +92,8 @@ func NewJetStreamSource(ctx context.Context, config *JetStreamSourceConfig) (*Je
subscription: subscription,
out: make(chan any),
}

go jetStreamSource.init(ctx)

return jetStreamSource, nil
}

Expand Down Expand Up @@ -184,6 +186,8 @@ type JetStreamSink struct {
in chan any
}

var _ streams.Sink = (*JetStreamSink)(nil)

// NewJetStreamSink returns a new JetStreamSink connector.
// The stream for the configured subject is expected to exist.
func NewJetStreamSink(config *JetStreamSinkConfig) (*JetStreamSink, error) {
Expand All @@ -195,8 +199,8 @@ func NewJetStreamSink(config *JetStreamSinkConfig) (*JetStreamSink, error) {
config: config,
in: make(chan any),
}

go jetStreamSink.init()

return jetStreamSink, nil
}

Expand All @@ -210,16 +214,15 @@ func (js *JetStreamSink) init() {
js.config.Subject,
message.Data,
js.config.PubOpts...)

case []byte:
_, err = js.config.JetStreamCtx.Publish(
js.config.Subject,
message,
js.config.PubOpts...)

default:
log.Printf("Unsupported message type: %T", message)
}

if err != nil {
log.Printf("Error processing JetStream message: %s", err)
}
Expand Down
Loading

0 comments on commit b235e52

Please sign in to comment.