diff --git a/backend/runner/pubsub/consumer.go b/backend/runner/pubsub/consumer.go index 09243af343..ea9d8638a6 100644 --- a/backend/runner/pubsub/consumer.go +++ b/backend/runner/pubsub/consumer.go @@ -27,6 +27,8 @@ type consumer struct { verb *schema.Verb subscriber *schema.MetadataSubscriber retryParams schema.RetryParams + group sarama.ConsumerGroup + cancel context.CancelFunc verbClient VerbClient timelineClient *timeline.Client @@ -40,11 +42,28 @@ func newConsumer(moduleName string, verb *schema.Verb, subscriber *schema.Metada return nil, fmt.Errorf("subscription %s has no Kafka brokers", verb.Name) } + config := sarama.NewConfig() + config.Consumer.Return.Errors = true + config.Consumer.Offsets.AutoCommit.Enable = true + switch subscriber.FromOffset { + case schema.FromOffsetBeginning, schema.FromOffsetUnspecified: + config.Consumer.Offsets.Initial = sarama.OffsetOldest + case schema.FromOffsetLatest: + config.Consumer.Offsets.Initial = sarama.OffsetNewest + } + + groupID := kafkaConsumerGroupID(moduleName, verb) + group, err := sarama.NewConsumerGroup(verb.Runtime.Subscription.KafkaBrokers, groupID, config) + if err != nil { + return nil, fmt.Errorf("failed to create consumer group for subscription %s: %w", verb.Name, err) + } + c := &consumer{ moduleName: moduleName, deployment: deployment, verb: verb, subscriber: subscriber, + group: group, verbClient: verbClient, timelineClient: timelineClient, @@ -73,51 +92,33 @@ func (c *consumer) kafkaTopicID() string { func (c *consumer) Begin(ctx context.Context) error { // set up config - logger := log.FromContext(ctx).Scope("subscription:" + c.verb.Name) + logger := log.FromContext(ctx).AppendScope("sub:" + c.verb.Name) ctx = log.ContextWithLogger(ctx, logger) - config := sarama.NewConfig() - config.Consumer.Return.Errors = true - config.Consumer.Offsets.AutoCommit.Enable = true + logger.Debugf("Subscribing to %s", c.kafkaTopicID()) - var fromOffsetStr string - switch c.subscriber.FromOffset { - case schema.FromOffsetBeginning, schema.FromOffsetUnspecified: - config.Consumer.Offsets.Initial = sarama.OffsetOldest - fromOffsetStr = "beginning" - case schema.FromOffsetLatest: - config.Consumer.Offsets.Initial = sarama.OffsetNewest - fromOffsetStr = "latest" - } - - groupID := kafkaConsumerGroupID(c.moduleName, c.verb) - logger.Debugf("Subscribing to %s from %s", c.kafkaTopicID(), fromOffsetStr) - - group, err := sarama.NewConsumerGroup(c.verb.Runtime.Subscription.KafkaBrokers, groupID, config) - if err != nil { - return fmt.Errorf("failed to create consumer group for subscription %s: %w", c.verb.Name, err) - } + ctx, cancel := context.WithCancel(ctx) + c.cancel = cancel - go c.watchErrors(ctx, group) - go c.subscribe(ctx, group) + go c.watchErrors(ctx) + go c.subscribe(ctx) return nil } -func (c *consumer) watchErrors(ctx context.Context, group sarama.ConsumerGroup) { +func (c *consumer) watchErrors(ctx context.Context) { logger := log.FromContext(ctx) for { select { case <-ctx.Done(): return - case err := <-group.Errors(): + case err := <-c.group.Errors(): logger.Errorf(err, "Consumer group error") } } } -func (c *consumer) subscribe(ctx context.Context, group sarama.ConsumerGroup) { +func (c *consumer) subscribe(ctx context.Context) { logger := log.FromContext(ctx) - defer group.Close() // Iterate over consumer sessions. // // `Consume` should be called inside an infinite loop, when a server-side rebalance happens, @@ -129,7 +130,7 @@ func (c *consumer) subscribe(ctx context.Context, group sarama.ConsumerGroup) { default: } - err := group.Consume(ctx, []string{c.kafkaTopicID()}, c) + err := c.group.Consume(ctx, []string{c.kafkaTopicID()}, c) if err != nil { logger.Errorf(err, "Session failed for %s", c.verb.Name) } else { @@ -159,31 +160,49 @@ func (c *consumer) Cleanup(session sarama.ConsumerGroupSession) error { func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { ctx := session.Context() logger := log.FromContext(ctx) - for msg := range claim.Messages() { - logger.Debugf("Consuming message with partition %v and offset %v)", msg.Partition, msg.Offset) - remainingRetries := c.retryParams.Count - backoff := c.retryParams.MinBackoff - for { - err := c.call(ctx, msg.Value, int(msg.Partition), int(msg.Offset)) - if err == nil { - logger.Errorf(err, "Error consuming message with partition %v and offset %v", msg.Partition, msg.Offset) - break - } - if remainingRetries == 0 { - logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset) - break + + for { + select { + case <-ctx.Done(): + // Rebalance or shutdown needed + return nil + + case msg := <-claim.Messages(): + if msg == nil { + // Channel closed, rebalance or shutdown needed + return nil } - logger.Errorf(err, "Failed to consume message with partition %v and offset %v and will retry in %vs", msg.Partition, msg.Offset, int(backoff.Seconds())) - time.Sleep(backoff) - remainingRetries-- - backoff *= 2 - if backoff > c.retryParams.MaxBackoff { - backoff = c.retryParams.MaxBackoff + logger.Debugf("Consuming message with partition %v and offset %v", msg.Partition, msg.Offset) + remainingRetries := c.retryParams.Count + backoff := c.retryParams.MinBackoff + for { + err := c.call(ctx, msg.Value, int(msg.Partition), int(msg.Offset)) + if err == nil { + break + } + select { + case <-ctx.Done(): + // Do not commit the message if we did not succeed and the context is done. + // No need to retry message either. + logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset) + return nil + default: + } + if remainingRetries == 0 { + logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset) + break + } + logger.Errorf(err, "Failed to consume message with partition %v and offset %v and will retry in %vs", msg.Partition, msg.Offset, int(backoff.Seconds())) + time.Sleep(backoff) + remainingRetries-- + backoff *= 2 + if backoff > c.retryParams.MaxBackoff { + backoff = c.retryParams.MaxBackoff + } } + session.MarkMessage(msg, "") } - session.MarkMessage(msg, "") } - return nil } func (c *consumer) call(ctx context.Context, body []byte, partition, offset int) error { diff --git a/backend/runner/pubsub/publisher.go b/backend/runner/pubsub/publisher.go index 800bbde32e..f0aacd4f7b 100644 --- a/backend/runner/pubsub/publisher.go +++ b/backend/runner/pubsub/publisher.go @@ -52,7 +52,7 @@ func newPublisher(module string, t *schema.Topic, deployment model.DeploymentKey } func (p *publisher) publish(ctx context.Context, data []byte, key string, caller schema.Ref) error { - logger := log.FromContext(ctx).Scope("topic:" + p.topic.Name) + logger := log.FromContext(ctx).AppendScope("topic:" + p.topic.Name) requestKey, err := rpc.RequestKeyFromContext(ctx) if err != nil { return fmt.Errorf("failed to get request key: %w", err) diff --git a/backend/runner/pubsub/testdata/go/publisher/publisher.go b/backend/runner/pubsub/testdata/go/publisher/publisher.go index af51579338..8ec3213b5f 100644 --- a/backend/runner/pubsub/testdata/go/publisher/publisher.go +++ b/backend/runner/pubsub/testdata/go/publisher/publisher.go @@ -82,7 +82,7 @@ func PublishOneToTopic2(ctx context.Context, req PublishOneToTopic2Request, topi } //ftl:verb -//ftl:subscribe testTopic from=latest +//ftl:subscribe localTopic from=latest func Local(ctx context.Context, event PubSubEvent) error { ftl.LoggerFromContext(ctx).Infof("Consume local: %v", event.Time) return nil diff --git a/internal/log/logger.go b/internal/log/logger.go index 241ce2df07..18429338bb 100644 --- a/internal/log/logger.go +++ b/internal/log/logger.go @@ -47,6 +47,17 @@ func New(level Level, sink Sink) *Logger { func (l Logger) Scope(scope string) *Logger { return l.Attrs(map[string]string{scopeKey: scope}) } + +func (l Logger) AppendScope(scope string) *Logger { + s, ok := l.attributes[scopeKey] + if ok { + s = s + ":" + scope + } else { + s = scope + } + return l.Attrs(map[string]string{scopeKey: s}) +} + func (l Logger) Module(module string) *Logger { return l.Attrs(map[string]string{moduleKey: module}) }