Skip to content

Commit

Permalink
fix: handle pubsub rebalance and closure (#3811)
Browse files Browse the repository at this point in the history
fixes #3808
  • Loading branch information
matt2e authored Dec 18, 2024
1 parent ac2d4eb commit ffbf1d6
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 51 deletions.
117 changes: 68 additions & 49 deletions backend/runner/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type consumer struct {
verb *schema.Verb
subscriber *schema.MetadataSubscriber
retryParams schema.RetryParams
group sarama.ConsumerGroup
cancel context.CancelFunc

verbClient VerbClient
timelineClient *timeline.Client
Expand All @@ -40,11 +42,28 @@ func newConsumer(moduleName string, verb *schema.Verb, subscriber *schema.Metada
return nil, fmt.Errorf("subscription %s has no Kafka brokers", verb.Name)
}

config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.AutoCommit.Enable = true
switch subscriber.FromOffset {
case schema.FromOffsetBeginning, schema.FromOffsetUnspecified:
config.Consumer.Offsets.Initial = sarama.OffsetOldest
case schema.FromOffsetLatest:
config.Consumer.Offsets.Initial = sarama.OffsetNewest
}

groupID := kafkaConsumerGroupID(moduleName, verb)
group, err := sarama.NewConsumerGroup(verb.Runtime.Subscription.KafkaBrokers, groupID, config)
if err != nil {
return nil, fmt.Errorf("failed to create consumer group for subscription %s: %w", verb.Name, err)
}

c := &consumer{
moduleName: moduleName,
deployment: deployment,
verb: verb,
subscriber: subscriber,
group: group,

verbClient: verbClient,
timelineClient: timelineClient,
Expand Down Expand Up @@ -73,51 +92,33 @@ func (c *consumer) kafkaTopicID() string {

func (c *consumer) Begin(ctx context.Context) error {
// set up config
logger := log.FromContext(ctx).Scope("subscription:" + c.verb.Name)
logger := log.FromContext(ctx).AppendScope("sub:" + c.verb.Name)
ctx = log.ContextWithLogger(ctx, logger)

config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.AutoCommit.Enable = true
logger.Debugf("Subscribing to %s", c.kafkaTopicID())

var fromOffsetStr string
switch c.subscriber.FromOffset {
case schema.FromOffsetBeginning, schema.FromOffsetUnspecified:
config.Consumer.Offsets.Initial = sarama.OffsetOldest
fromOffsetStr = "beginning"
case schema.FromOffsetLatest:
config.Consumer.Offsets.Initial = sarama.OffsetNewest
fromOffsetStr = "latest"
}

groupID := kafkaConsumerGroupID(c.moduleName, c.verb)
logger.Debugf("Subscribing to %s from %s", c.kafkaTopicID(), fromOffsetStr)

group, err := sarama.NewConsumerGroup(c.verb.Runtime.Subscription.KafkaBrokers, groupID, config)
if err != nil {
return fmt.Errorf("failed to create consumer group for subscription %s: %w", c.verb.Name, err)
}
ctx, cancel := context.WithCancel(ctx)
c.cancel = cancel

go c.watchErrors(ctx, group)
go c.subscribe(ctx, group)
go c.watchErrors(ctx)
go c.subscribe(ctx)
return nil
}

func (c *consumer) watchErrors(ctx context.Context, group sarama.ConsumerGroup) {
func (c *consumer) watchErrors(ctx context.Context) {
logger := log.FromContext(ctx)
for {
select {
case <-ctx.Done():
return
case err := <-group.Errors():
case err := <-c.group.Errors():
logger.Errorf(err, "Consumer group error")
}
}
}

func (c *consumer) subscribe(ctx context.Context, group sarama.ConsumerGroup) {
func (c *consumer) subscribe(ctx context.Context) {
logger := log.FromContext(ctx)
defer group.Close()
// Iterate over consumer sessions.
//
// `Consume` should be called inside an infinite loop, when a server-side rebalance happens,
Expand All @@ -129,7 +130,7 @@ func (c *consumer) subscribe(ctx context.Context, group sarama.ConsumerGroup) {
default:
}

err := group.Consume(ctx, []string{c.kafkaTopicID()}, c)
err := c.group.Consume(ctx, []string{c.kafkaTopicID()}, c)
if err != nil {
logger.Errorf(err, "Session failed for %s", c.verb.Name)
} else {
Expand Down Expand Up @@ -159,31 +160,49 @@ func (c *consumer) Cleanup(session sarama.ConsumerGroupSession) error {
func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := session.Context()
logger := log.FromContext(ctx)
for msg := range claim.Messages() {
logger.Debugf("Consuming message with partition %v and offset %v)", msg.Partition, msg.Offset)
remainingRetries := c.retryParams.Count
backoff := c.retryParams.MinBackoff
for {
err := c.call(ctx, msg.Value, int(msg.Partition), int(msg.Offset))
if err == nil {
logger.Errorf(err, "Error consuming message with partition %v and offset %v", msg.Partition, msg.Offset)
break
}
if remainingRetries == 0 {
logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset)
break

for {
select {
case <-ctx.Done():
// Rebalance or shutdown needed
return nil

case msg := <-claim.Messages():
if msg == nil {
// Channel closed, rebalance or shutdown needed
return nil
}
logger.Errorf(err, "Failed to consume message with partition %v and offset %v and will retry in %vs", msg.Partition, msg.Offset, int(backoff.Seconds()))
time.Sleep(backoff)
remainingRetries--
backoff *= 2
if backoff > c.retryParams.MaxBackoff {
backoff = c.retryParams.MaxBackoff
logger.Debugf("Consuming message with partition %v and offset %v", msg.Partition, msg.Offset)
remainingRetries := c.retryParams.Count
backoff := c.retryParams.MinBackoff
for {
err := c.call(ctx, msg.Value, int(msg.Partition), int(msg.Offset))
if err == nil {
break
}
select {
case <-ctx.Done():
// Do not commit the message if we did not succeed and the context is done.
// No need to retry message either.
logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset)
return nil
default:
}
if remainingRetries == 0 {
logger.Errorf(err, "Failed to consume message with partition %v and offset %v", msg.Partition, msg.Offset)
break
}
logger.Errorf(err, "Failed to consume message with partition %v and offset %v and will retry in %vs", msg.Partition, msg.Offset, int(backoff.Seconds()))
time.Sleep(backoff)
remainingRetries--
backoff *= 2
if backoff > c.retryParams.MaxBackoff {
backoff = c.retryParams.MaxBackoff
}
}
session.MarkMessage(msg, "")
}
session.MarkMessage(msg, "")
}
return nil
}

func (c *consumer) call(ctx context.Context, body []byte, partition, offset int) error {
Expand Down
2 changes: 1 addition & 1 deletion backend/runner/pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func newPublisher(module string, t *schema.Topic, deployment model.DeploymentKey
}

func (p *publisher) publish(ctx context.Context, data []byte, key string, caller schema.Ref) error {
logger := log.FromContext(ctx).Scope("topic:" + p.topic.Name)
logger := log.FromContext(ctx).AppendScope("topic:" + p.topic.Name)
requestKey, err := rpc.RequestKeyFromContext(ctx)
if err != nil {
return fmt.Errorf("failed to get request key: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion backend/runner/pubsub/testdata/go/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func PublishOneToTopic2(ctx context.Context, req PublishOneToTopic2Request, topi
}

//ftl:verb
//ftl:subscribe testTopic from=latest
//ftl:subscribe localTopic from=latest
func Local(ctx context.Context, event PubSubEvent) error {
ftl.LoggerFromContext(ctx).Infof("Consume local: %v", event.Time)
return nil
Expand Down
11 changes: 11 additions & 0 deletions internal/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ func New(level Level, sink Sink) *Logger {
func (l Logger) Scope(scope string) *Logger {
return l.Attrs(map[string]string{scopeKey: scope})
}

func (l Logger) AppendScope(scope string) *Logger {
s, ok := l.attributes[scopeKey]
if ok {
s = s + ":" + scope
} else {
s = scope
}
return l.Attrs(map[string]string{scopeKey: s})
}

func (l Logger) Module(module string) *Logger {
return l.Attrs(map[string]string{moduleKey: module})
}
Expand Down

0 comments on commit ffbf1d6

Please sign in to comment.