Skip to content

Commit

Permalink
logs
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Dec 17, 2024
1 parent 3460106 commit ea1c55e
Showing 1 changed file with 1 addition and 2 deletions.
3 changes: 1 addition & 2 deletions backend/runner/pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (c *consumer) Setup(session sarama.ConsumerGroupSession) error {
logger := log.FromContext(session.Context())

partitions := session.Claims()[kafkaConsumerGroupID(c.moduleName, c.verb)]
logger.Debugf("Starting consume session for subscription %s with partitions: [%v]", c.verb.Name, strings.Join(slices.Map(partitions, func(partition int32) string { return strconv.Itoa(int(partition)) }), ","))
logger.Infof("Starting consume session for subscription %s with partitions: [%v]", c.verb.Name, strings.Join(slices.Map(partitions, func(partition int32) string { return strconv.Itoa(int(partition)) }), ","))

return nil
}
Expand All @@ -164,7 +164,6 @@ func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
remainingRetries := c.retryParams.Count
backoff := c.retryParams.MinBackoff
for {
logger.Infof("%s: consuming message %v:%v", c.verb.Name, msg.Partition, msg.Offset)
err := c.call(ctx, msg.Value, int(msg.Partition), int(msg.Offset))
if err == nil {
logger.Errorf(err, "%s: error consuming message %v:%v", c.verb.Name, msg.Partition, msg.Offset)
Expand Down

0 comments on commit ea1c55e

Please sign in to comment.