Skip to content

Commit

Permalink
sleep?
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Dec 17, 2024
1 parent 32b91b0 commit b221def
Showing 1 changed file with 39 additions and 0 deletions.
39 changes: 39 additions & 0 deletions backend/runner/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func (c *consumer) Begin(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to create consumer group for subscription %s: %w", c.verb.Name, err)
}
c.checkOffsets(ctx)

time.Sleep(1 * time.Second)

go c.watchErrors(ctx, group)
go c.subscribe(ctx, group)
return nil
Expand Down Expand Up @@ -226,3 +230,38 @@ func (c *consumer) call(ctx context.Context, body []byte, partition, offset int)
observability.Calls.Request(ctx, req.Verb, start, optional.None[string]())
return nil
}

func (c *consumer) checkOffsets(ctx context.Context) {
groupId := kafkaConsumerGroupID(c.moduleName, c.verb)

Check warning on line 235 in backend/runner/pubsub/consumer.go

View workflow job for this annotation

GitHub Actions / Lint

var-naming: var groupId should be groupID (revive)
logger := log.FromContext(ctx).Scope(groupId + ".offsets")
client, err := sarama.NewClient(c.verb.Runtime.Subscription.KafkaBrokers, sarama.NewConfig())
if err != nil {
logger.Errorf(err, "failed to create client")
}
admin, err := sarama.NewClusterAdminFromClient(client)
if err != nil {
logger.Errorf(err, "failed to create cluster admin")
return
}
groups, err := admin.DescribeConsumerGroups([]string{groupId})
if err != nil {
logger.Errorf(err, "failed to describe consumer groups")
return
}
if len(groups) == 0 {
logger.Infof("no consumer group found")
return
}
resp, err := admin.ListConsumerGroupOffsets(groupId, nil)
if err != nil {
logger.Errorf(err, "failed to list consumer group offsets")
return
}
outStrs := []string{}
for topic, partitions := range resp.Blocks {
for partition, block := range partitions {
outStrs = append(outStrs, fmt.Sprintf("%s:%d:%d", topic, partition, block.Offset))
}
}
logger.Infof("offsets:\n%s", strings.Join(outStrs, "\n"))
}

0 comments on commit b221def

Please sign in to comment.