From 13b6e8f3574eeccddb07b48e245cb3fe0b13ceda Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 24 Sep 2024 13:11:44 +0200 Subject: [PATCH] fix(kafka): Fixes partition selection in distributors --- pkg/distributor/distributor.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 08fba483ec9bc..3ad586f3e596f 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -901,12 +901,11 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, if len(stream.Stream.Entries) == 0 { return nil } - /* partitionID, err := d.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey) - if err != nil { - d.kafkaAppends.WithLabelValues("kafka", "fail").Inc() - return fmt.Errorf("failed to find active partition for stream: %w", err) - }*/ - partitionID := int32(0) + partitionID, err := d.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey) + if err != nil { + d.kafkaAppends.WithLabelValues("kafka", "fail").Inc() + return fmt.Errorf("failed to find active partition for stream: %w", err) + } startTime := time.Now()