Skip to content

Commit

Permalink
fix(#234): use unique consumer groups
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanHCB committed Nov 27, 2023
1 parent 12ecf0a commit 9c7b780
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions internal/repository/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"context"
"errors"
"fmt"
"github.com/IBM/sarama"
"github.com/Interhyp/metadata-service/internal/acorn/config"
"github.com/Interhyp/metadata-service/internal/acorn/repository"
Expand Down Expand Up @@ -131,9 +132,7 @@ func (r *Impl) StartReceiveLoop(ctx context.Context) error {
return errors.New("failed to obtain local non-localhost ip address to use for consumer group, did not get an ipv4 address")

Check warning on line 132 in internal/repository/kafka/kafka.go

View check run for this annotation

Codecov / codecov/patch

internal/repository/kafka/kafka.go#L132

Added line #L132 was not covered by tests
}

workerNodeId := ipComponents[2]

groupId = "metadata-worker" + workerNodeId
groupId = fmt.Sprintf("metadata-worker-%s-%s", ipComponents[2], ipComponents[3])

Check warning on line 135 in internal/repository/kafka/kafka.go

View check run for this annotation

Codecov / codecov/patch

internal/repository/kafka/kafka.go#L135

Added line #L135 was not covered by tests
}
topicConfig.ConsumerGroup = &groupId

Check warning on line 137 in internal/repository/kafka/kafka.go

View check run for this annotation

Codecov / codecov/patch

internal/repository/kafka/kafka.go#L137

Added line #L137 was not covered by tests
r.Logging.Logger().Ctx(ctx).Info().Printf("using kafka group id %s for consumer", groupId)
Expand All @@ -142,6 +141,7 @@ func (r *Impl) StartReceiveLoop(ctx context.Context) error {
configPreset.Net.TLS.Enable = true
configPreset.Producer.Compression = sarama.CompressionNone
configPreset.MetricRegistry = metrics.NewPrefixedChildRegistry(metrics.DefaultRegistry, "sarama.consumer.")
configPreset.Consumer.Offsets.Initial = sarama.OffsetNewest

Check warning on line 144 in internal/repository/kafka/kafka.go

View check run for this annotation

Codecov / codecov/patch

internal/repository/kafka/kafka.go#L140-L144

Added lines #L140 - L144 were not covered by tests

consumer, err := aukafka.CreateConsumer[repository.UpdateEvent](ctx, *topicConfig, callback, configPreset)

Check warning on line 146 in internal/repository/kafka/kafka.go

View check run for this annotation

Codecov / codecov/patch

internal/repository/kafka/kafka.go#L146

Added line #L146 was not covered by tests
if err != nil {
Expand Down

0 comments on commit 9c7b780

Please sign in to comment.