diff --git a/kafka/kafka_sarama.go b/kafka/kafka_sarama.go index a35c915..a111afa 100644 --- a/kafka/kafka_sarama.go +++ b/kafka/kafka_sarama.go @@ -3,10 +3,6 @@ package kafka import ( "context" "log" - "os" - "os/signal" - "sync" - "syscall" "github.com/IBM/sarama" "github.com/reugn/go-streams" @@ -15,16 +11,15 @@ import ( // KafkaSource represents an Apache Kafka source connector. type KafkaSource struct { - consumer sarama.ConsumerGroup - handler sarama.ConsumerGroupHandler - topics []string - out chan any - ctx context.Context - cancelCtx context.CancelFunc - wg *sync.WaitGroup + consumer sarama.ConsumerGroup + handler sarama.ConsumerGroupHandler + topics []string + out chan any } -// NewKafkaSource returns a new KafkaSource instance. +var _ streams.Source = (*KafkaSource)(nil) + +// NewKafkaSource returns a new KafkaSource connector. func NewKafkaSource(ctx context.Context, addrs []string, groupID string, config *sarama.Config, topics ...string) (*KafkaSource, error) { consumerGroup, err := sarama.NewConsumerGroup(addrs, groupID, config) @@ -33,103 +28,84 @@ func NewKafkaSource(ctx context.Context, addrs []string, groupID string, } out := make(chan any) - cctx, cancel := context.WithCancel(ctx) - - sink := &KafkaSource{ - consumer: consumerGroup, - handler: &GroupHandler{make(chan struct{}), out}, - topics: topics, - out: out, - ctx: cctx, - cancelCtx: cancel, - wg: &sync.WaitGroup{}, + source := &KafkaSource{ + consumer: consumerGroup, + handler: &GroupHandler{make(chan struct{}), out}, + topics: topics, + out: out, } + go source.init(ctx) - go sink.init() - return sink, nil + return source, nil } -func (ks *KafkaSource) claimLoop() { - ks.wg.Add(1) - defer func() { - ks.wg.Done() - log.Printf("Exiting Kafka claimLoop") - }() +func (ks *KafkaSource) init(ctx context.Context) { +loop: for { handler := ks.handler.(*GroupHandler) - // `Consume` should be called inside an infinite loop, when a - // server-side rebalance happens, the consumer session will need to be - // recreated to get the new claims - if err := ks.consumer.Consume(ks.ctx, ks.topics, handler); err != nil { - log.Printf("Kafka consumer.Consume failed with: %v", err) + // Consume is called inside an infinite loop, so that when a + // server-side rebalance happens, the consumer session will be + // recreated to get the new claims. + if err := ks.consumer.Consume(ctx, ks.topics, handler); err != nil { + log.Printf("Error is Consume: %s", err) } + handler.ready = make(chan struct{}) select { - case <-ks.ctx.Done(): - return + case <-ctx.Done(): + break loop default: } - - handler.ready = make(chan struct{}) - } -} - -// init starts the main loop -func (ks *KafkaSource) init() { - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - go ks.claimLoop() - - select { - case <-sigchan: - ks.cancelCtx() - case <-ks.ctx.Done(): } - - log.Printf("Closing Kafka consumer") - ks.wg.Wait() + log.Printf("Closing Kafka source connector") close(ks.out) - ks.consumer.Close() + if err := ks.consumer.Close(); err != nil { + log.Printf("Error in Close: %s", err) + } } -// Via streams data through the given flow -func (ks *KafkaSource) Via(_flow streams.Flow) streams.Flow { - flow.DoStream(ks, _flow) - return _flow +// Via streams data to a specified operator and returns it. +func (ks *KafkaSource) Via(operator streams.Flow) streams.Flow { + flow.DoStream(ks, operator) + return operator } -// Out returns an output channel for sending data +// Out returns the output channel of the KafkaSource connector. func (ks *KafkaSource) Out() <-chan any { return ks.out } -// GroupHandler represents a Sarama consumer group handler +// GroupHandler implements the [sarama.ConsumerGroupHandler] interface. +// ConsumerGroupHandler instances are used to handle individual topic/partition claims. +// It also provides hooks for the consumer group session life-cycle and +// allows for triggering logic before or after the consume loop(s). type GroupHandler struct { ready chan struct{} out chan any } -// Setup is run at the beginning of a new session, before ConsumeClaim +// Setup is run at the beginning of a new session, before ConsumeClaim. func (handler *GroupHandler) Setup(sarama.ConsumerGroupSession) error { - // Mark the consumer as ready + // mark the consumer as ready close(handler.ready) return nil } -// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited +// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited. func (handler *GroupHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). -func (handler *GroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { +func (handler *GroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, + claim sarama.ConsumerGroupClaim) error { for { select { case message := <-claim.Messages(): if message != nil { log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic) - session.MarkMessage(message, "") + session.MarkMessage(message, "") // mark the message as consumed handler.out <- message } @@ -146,7 +122,9 @@ type KafkaSink struct { in chan any } -// NewKafkaSink returns a new KafkaSink instance. +var _ streams.Sink = (*KafkaSink)(nil) + +// NewKafkaSink returns a new KafkaSink connector. func NewKafkaSink(addrs []string, config *sarama.Config, topic string) (*KafkaSink, error) { producer, err := sarama.NewSyncProducer(addrs, config) if err != nil { @@ -158,48 +136,45 @@ func NewKafkaSink(addrs []string, config *sarama.Config, topic string) (*KafkaSi topic: topic, in: make(chan any), } - go sink.init() + return sink, nil } -// init starts the main loop func (ks *KafkaSink) init() { for msg := range ks.in { var err error - switch m := msg.(type) { + switch message := msg.(type) { case *sarama.ProducerMessage: - _, _, err = ks.producer.SendMessage(m) - + _, _, err = ks.producer.SendMessage(message) case *sarama.ConsumerMessage: - sMsg := &sarama.ProducerMessage{ + producerMessage := &sarama.ProducerMessage{ Topic: ks.topic, - Key: sarama.StringEncoder(m.Key), - Value: sarama.StringEncoder(m.Value), + Key: sarama.StringEncoder(message.Key), + Value: sarama.StringEncoder(message.Value), } - _, _, err = ks.producer.SendMessage(sMsg) - + _, _, err = ks.producer.SendMessage(producerMessage) case string: - sMsg := &sarama.ProducerMessage{ + producerMessage := &sarama.ProducerMessage{ Topic: ks.topic, - Value: sarama.StringEncoder(m), + Value: sarama.StringEncoder(message), } - _, _, err = ks.producer.SendMessage(sMsg) - + _, _, err = ks.producer.SendMessage(producerMessage) default: - log.Printf("Unsupported message type %v", m) + log.Printf("Unsupported message type: %T", message) } if err != nil { log.Printf("Error processing Kafka message: %s", err) } } - - log.Printf("Closing Kafka producer") - ks.producer.Close() + log.Printf("Closing Kafka sink connector") + if err := ks.producer.Close(); err != nil { + log.Printf("Error in Close: %s", err) + } } -// In returns an input channel for receiving data +// In returns the input channel of the KafkaSink connector. func (ks *KafkaSink) In() chan<- any { return ks.in } diff --git a/nats/nats_jetstream.go b/nats/nats_jetstream.go index 7402582..1e13c6a 100644 --- a/nats/nats_jetstream.go +++ b/nats/nats_jetstream.go @@ -53,7 +53,7 @@ func (config *JetStreamSourceConfig) validate() error { return nil } -// NewJetStreamSourceConfig returns a new JetStreamSourceConfig with default values. +// NewJetStreamSourceConfig returns a new [JetStreamSourceConfig] with default values. func NewJetStreamSourceConfig(conn *nats.Conn, jetStreamContext nats.JetStreamContext, subject string) *JetStreamSourceConfig { return &JetStreamSourceConfig{ @@ -72,6 +72,8 @@ type JetStreamSource struct { out chan any } +var _ streams.Source = (*JetStreamSource)(nil) + // NewJetStreamSource returns a new JetStreamSource connector. // A pull-based subscription is used to consume data from the subject. func NewJetStreamSource(ctx context.Context, config *JetStreamSourceConfig) (*JetStreamSource, error) { @@ -90,8 +92,8 @@ func NewJetStreamSource(ctx context.Context, config *JetStreamSourceConfig) (*Je subscription: subscription, out: make(chan any), } - go jetStreamSource.init(ctx) + return jetStreamSource, nil } @@ -184,6 +186,8 @@ type JetStreamSink struct { in chan any } +var _ streams.Sink = (*JetStreamSink)(nil) + // NewJetStreamSink returns a new JetStreamSink connector. // The stream for the configured subject is expected to exist. func NewJetStreamSink(config *JetStreamSinkConfig) (*JetStreamSink, error) { @@ -195,8 +199,8 @@ func NewJetStreamSink(config *JetStreamSinkConfig) (*JetStreamSink, error) { config: config, in: make(chan any), } - go jetStreamSink.init() + return jetStreamSink, nil } @@ -210,16 +214,15 @@ func (js *JetStreamSink) init() { js.config.Subject, message.Data, js.config.PubOpts...) - case []byte: _, err = js.config.JetStreamCtx.Publish( js.config.Subject, message, js.config.PubOpts...) - default: log.Printf("Unsupported message type: %T", message) } + if err != nil { log.Printf("Error processing JetStream message: %s", err) } diff --git a/nats/nats_streaming.go b/nats/nats_streaming.go index 53b63d5..096d492 100644 --- a/nats/nats_streaming.go +++ b/nats/nats_streaming.go @@ -3,10 +3,6 @@ package nats import ( "context" "log" - "os" - "os/signal" - "sync" - "syscall" stan "github.com/nats-io/stan.go" "github.com/reugn/go-streams" @@ -14,97 +10,62 @@ import ( ) // StreamingSource represents a NATS Streaming source connector. -// Deprecated: Use JetStreamSource instead. +// Deprecated: Use [JetStreamSource] instead. type StreamingSource struct { conn stan.Conn subscriptions []stan.Subscription subscriptionType stan.SubscriptionOption - - topics []string - out chan any - ctx context.Context - cancelCtx context.CancelFunc - wg *sync.WaitGroup + topics []string + out chan any } +var _ streams.Source = (*StreamingSource)(nil) + // NewStreamingSource returns a new StreamingSource connector. -func NewStreamingSource(ctx context.Context, conn stan.Conn, subscriptionType stan.SubscriptionOption, +func NewStreamingSource(ctx context.Context, conn stan.Conn, + subscriptionType stan.SubscriptionOption, topics ...string) *StreamingSource { - cctx, cancel := context.WithCancel(ctx) - streamingSource := &StreamingSource{ conn: conn, subscriptions: []stan.Subscription{}, subscriptionType: subscriptionType, topics: topics, out: make(chan any), - ctx: cctx, - cancelCtx: cancel, - wg: &sync.WaitGroup{}, } + go streamingSource.init(ctx) - go streamingSource.init() return streamingSource } -// init starts the stream processing loop. -func (ns *StreamingSource) init() { - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - +func (ns *StreamingSource) init(ctx context.Context) { // bind all topic subscribers for _, topic := range ns.topics { - ns.wg.Add(1) - go func(t string) { - defer ns.wg.Done() - sub, err := ns.conn.Subscribe(t, func(msg *stan.Msg) { - ns.out <- msg - }, ns.subscriptionType) - if err != nil { - log.Fatal("Failed to subscribe to NATS cluster") - } - - log.Printf("StreamingSource subscribed to topic %s", t) - ns.subscriptions = append(ns.subscriptions, sub) - }(topic) + sub, err := ns.conn.Subscribe(topic, func(msg *stan.Msg) { + ns.out <- msg + }, ns.subscriptionType) + if err != nil { + log.Printf("Failed to subscribe to topic %s: %s", topic, err) + continue + } + log.Printf("Subscribed to topic %s", topic) + ns.subscriptions = append(ns.subscriptions, sub) } - // wait for an interrupt to unsubscribe topics - go ns.awaitCleanup() - - select { - case <-sigchan: - log.Print("StreamingSource received termination signal, cleaning up...") - ns.cancelCtx() - case <-ns.ctx.Done(): - } + <-ctx.Done() - ns.wg.Wait() + log.Printf("Closing NATS Streaming source connector") close(ns.out) + ns.unsubscribe() // unbind all topic subscriptions if err := ns.conn.Close(); err != nil { - log.Printf("Failed to close NATS Streaming connection: %s", err) - } else { - log.Print("NATS Streaming connection closed") + log.Printf("Error in Close: %s", err) } } -func (ns *StreamingSource) awaitCleanup() { - ns.wg.Add(1) - defer ns.wg.Done() - - select { - case <-ns.ctx.Done(): - for _, sub := range ns.subscriptions { - ns.wg.Add(1) - go func(sub stan.Subscription) { - defer ns.wg.Done() - - if err := sub.Unsubscribe(); err != nil { - log.Fatal("Failed to remove NATS subscription") - } - }(sub) +func (ns *StreamingSource) unsubscribe() { + for _, subscription := range ns.subscriptions { + if err := subscription.Unsubscribe(); err != nil { + log.Printf("Failed to remove NATS subscription: %s", err) } - default: } } @@ -120,13 +81,15 @@ func (ns *StreamingSource) Out() <-chan any { } // StreamingSink represents a NATS Streaming sink connector. -// Deprecated: Use JetStreamSink instead. +// Deprecated: Use [JetStreamSink] instead. type StreamingSink struct { conn stan.Conn topic string in chan any } +var _ streams.Sink = (*StreamingSink)(nil) + // NewStreamingSink returns a new StreamingSink connector. func NewStreamingSink(conn stan.Conn, topic string) *StreamingSink { streamingSink := &StreamingSink{ @@ -134,34 +97,30 @@ func NewStreamingSink(conn stan.Conn, topic string) *StreamingSink { topic: topic, in: make(chan any), } - go streamingSink.init() + return streamingSink } -// init starts the stream processing loop. func (ns *StreamingSink) init() { for msg := range ns.in { var err error switch message := msg.(type) { case *stan.Msg: err = ns.conn.Publish(ns.topic, message.Data) - case []byte: err = ns.conn.Publish(ns.topic, message) - default: log.Printf("Unsupported message type: %T", message) } + if err != nil { log.Printf("Error processing NATS Streaming message: %s", err) } } - + log.Printf("Closing NATS Streaming sink connector") if err := ns.conn.Close(); err != nil { - log.Printf("Failed to close NATS Streaming connection: %s", err) - } else { - log.Print("NATS Streaming connection closed") + log.Printf("Error in Close: %s", err) } } diff --git a/pulsar/pulsar.go b/pulsar/pulsar.go index 0388bda..6e01532 100644 --- a/pulsar/pulsar.go +++ b/pulsar/pulsar.go @@ -3,9 +3,6 @@ package pulsar import ( "context" "log" - "os" - "os/signal" - "syscall" "github.com/apache/pulsar-client-go/pulsar" "github.com/reugn/go-streams" @@ -17,10 +14,11 @@ type PulsarSource struct { client pulsar.Client consumer pulsar.Consumer out chan any - ctx context.Context } -// NewPulsarSource returns a new PulsarSource instance. +var _ streams.Source = (*PulsarSource)(nil) + +// NewPulsarSource returns a new PulsarSource connector. func NewPulsarSource(ctx context.Context, clientOptions *pulsar.ClientOptions, consumerOptions *pulsar.ConsumerOptions) (*PulsarSource, error) { client, err := pulsar.NewClient(*clientOptions) @@ -37,50 +35,41 @@ func NewPulsarSource(ctx context.Context, clientOptions *pulsar.ClientOptions, client: client, consumer: consumer, out: make(chan any), - ctx: ctx, } + go source.init(ctx) - go source.init() return source, nil } -// init starts the main loop -func (ps *PulsarSource) init() { - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - +func (ps *PulsarSource) init(ctx context.Context) { loop: for { select { - case <-sigchan: + case <-ctx.Done(): break loop - - case <-ps.ctx.Done(): - break loop - default: - msg, err := ps.consumer.Receive(ps.ctx) - if err == nil { - ps.out <- msg - } else { - log.Println(err) + // this call blocks until a message is available + msg, err := ps.consumer.Receive(ctx) + if err != nil { + log.Printf("Error is Receive: %s", err) + continue } + ps.out <- msg } } - - log.Printf("Closing Pulsar consumer") + log.Printf("Closing Pulsar source connector") close(ps.out) ps.consumer.Close() ps.client.Close() } -// Via streams data through the given flow -func (ps *PulsarSource) Via(_flow streams.Flow) streams.Flow { - flow.DoStream(ps, _flow) - return _flow +// Via streams data to a specified operator and returns it. +func (ps *PulsarSource) Via(operator streams.Flow) streams.Flow { + flow.DoStream(ps, operator) + return operator } -// Out returns an output channel for sending data +// Out returns the output channel of the PulsarSource connector. func (ps *PulsarSource) Out() <-chan any { return ps.out } @@ -90,10 +79,11 @@ type PulsarSink struct { client pulsar.Client producer pulsar.Producer in chan any - ctx context.Context } -// NewPulsarSink returns a new PulsarSink instance. +var _ streams.Sink = (*PulsarSink)(nil) + +// NewPulsarSink returns a new PulsarSink connector. func NewPulsarSink(ctx context.Context, clientOptions *pulsar.ClientOptions, producerOptions *pulsar.ProducerOptions) (*PulsarSink, error) { client, err := pulsar.NewClient(*clientOptions) @@ -110,43 +100,38 @@ func NewPulsarSink(ctx context.Context, clientOptions *pulsar.ClientOptions, client: client, producer: producer, in: make(chan any), - ctx: ctx, } + go sink.init(ctx) - go sink.init() return sink, nil } -// init starts the main loop -func (ps *PulsarSink) init() { +func (ps *PulsarSink) init(ctx context.Context) { for msg := range ps.in { var err error - switch m := msg.(type) { + switch message := msg.(type) { case pulsar.Message: - _, err = ps.producer.Send(ps.ctx, &pulsar.ProducerMessage{ - Payload: m.Payload(), + _, err = ps.producer.Send(ctx, &pulsar.ProducerMessage{ + Payload: message.Payload(), }) - case string: - _, err = ps.producer.Send(ps.ctx, &pulsar.ProducerMessage{ - Payload: []byte(m), + _, err = ps.producer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte(message), }) - default: - log.Printf("Unsupported message type %v", m) + log.Printf("Unsupported message type: %T", message) } if err != nil { log.Printf("Error processing Pulsar message: %s", err) } } - - log.Printf("Closing Pulsar producer") + log.Printf("Closing Pulsar sink connector") ps.producer.Close() ps.client.Close() } -// In returns an input channel for receiving data +// In returns the input channel of the PulsarSink connector. func (ps *PulsarSink) In() chan<- any { return ps.in }