diff --git a/internal/repository/kafka/kafka.go b/internal/repository/kafka/kafka.go index b7da92a..d7c5638 100644 --- a/internal/repository/kafka/kafka.go +++ b/internal/repository/kafka/kafka.go @@ -3,6 +3,7 @@ package kafka import ( "context" "errors" + "fmt" "github.com/IBM/sarama" "github.com/Interhyp/metadata-service/internal/acorn/config" "github.com/Interhyp/metadata-service/internal/acorn/repository" @@ -131,9 +132,7 @@ func (r *Impl) StartReceiveLoop(ctx context.Context) error { return errors.New("failed to obtain local non-localhost ip address to use for consumer group, did not get an ipv4 address") } - workerNodeId := ipComponents[2] - - groupId = "metadata-worker" + workerNodeId + groupId = fmt.Sprintf("metadata-worker-%s-%s", ipComponents[2], ipComponents[3]) } topicConfig.ConsumerGroup = &groupId r.Logging.Logger().Ctx(ctx).Info().Printf("using kafka group id %s for consumer", groupId) @@ -142,6 +141,7 @@ func (r *Impl) StartReceiveLoop(ctx context.Context) error { configPreset.Net.TLS.Enable = true configPreset.Producer.Compression = sarama.CompressionNone configPreset.MetricRegistry = metrics.NewPrefixedChildRegistry(metrics.DefaultRegistry, "sarama.consumer.") + configPreset.Consumer.Offsets.Initial = sarama.OffsetNewest consumer, err := aukafka.CreateConsumer[repository.UpdateEvent](ctx, *topicConfig, callback, configPreset) if err != nil {