diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 15426e54d088f..1918043ed49a8 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -839,6 +839,20 @@ kafka_config: # CLI flag: -kafka.producer-max-buffered-bytes [producer_max_buffered_bytes: | default = 1073741824] + # The best-effort maximum lag a consumer tries to achieve at startup. Set both + # -kafka.target-consumer-lag-at-startup and -kafka.max-consumer-lag-at-startup + # to 0 to disable waiting for maximum consumer lag being honored at startup. + # CLI flag: -kafka.target-consumer-lag-at-startup + [target_consumer_lag_at_startup: | default = 2s] + + # The guaranteed maximum lag before a consumer is considered to have caught up + # reading from a partition at startup, becomes ACTIVE in the hash ring and + # passes the readiness check. Set both -kafka.target-consumer-lag-at-startup + # and -kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum + # consumer lag being honored at startup. + # CLI flag: -kafka.max-consumer-lag-at-startup + [max_consumer_lag_at_startup: | default = 15s] + # Configuration for 'runtime config' module, responsible for reloading runtime # configuration file. [runtime_config: ] diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7776b9097f085..ed99459dae33f 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -300,7 +300,7 @@ type Ingester struct { } // New makes a new Ingester. -func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger, customStreamsTracker push.UsageTracker, readRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher) (*Ingester, error) { +func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger, customStreamsTracker push.UsageTracker, readRing ring.ReadRing, partitionRingWatcher ring.PartitionRingReader) (*Ingester, error) { if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = client.New } diff --git a/pkg/ingester/kafka_consumer.go b/pkg/ingester/kafka_consumer.go index c2fe90ee052f6..0e6185fbe7968 100644 --- a/pkg/ingester/kafka_consumer.go +++ b/pkg/ingester/kafka_consumer.go @@ -3,7 +3,7 @@ package ingester import ( "context" "errors" - math "math" + "math" "sync" "time" diff --git a/pkg/kafka/config.go b/pkg/kafka/config.go index 7f981b7b5e739..13cfb618cfdb9 100644 --- a/pkg/kafka/config.go +++ b/pkg/kafka/config.go @@ -36,18 +36,14 @@ const ( // in the worst case scenario, which is expected to be way above the actual one. maxProducerRecordDataBytesLimit = producerBatchMaxBytes - 16384 minProducerRecordDataBytesLimit = 1024 * 1024 - - kafkaConfigFlagPrefix = "ingest-storage.kafka" - targetConsumerLagAtStartupFlag = kafkaConfigFlagPrefix + ".target-consumer-lag-at-startup" - maxConsumerLagAtStartupFlag = kafkaConfigFlagPrefix + ".max-consumer-lag-at-startup" ) var ( ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured") ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured") + ErrInconsistentConsumerLagAtStartup = errors.New("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0") + ErrInvalidMaxConsumerLagAtStartup = errors.New("the configured max consumer lag at startup must greater or equal than the configured target consumer lag") ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit) - - consumeFromPositionOptions = []string{consumeFromLastOffset, consumeFromStart, consumeFromEnd, consumeFromTimestamp} ) // Config holds the generic config for the Kafka backend. @@ -68,6 +64,9 @@ type Config struct { ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"` ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"` + + TargetConsumerLagAtStartup time.Duration `yaml:"target_consumer_lag_at_startup"` + MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -91,6 +90,10 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.IntVar(&cfg.ProducerMaxRecordSizeBytes, prefix+".producer-max-record-size-bytes", maxProducerRecordDataBytesLimit, "The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes.") f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.") + + consumerLagUsage := fmt.Sprintf("Set both -%s and -%s to 0 to disable waiting for maximum consumer lag being honored at startup.", prefix+".target-consumer-lag-at-startup", prefix+".max-consumer-lag-at-startup") + f.DurationVar(&cfg.TargetConsumerLagAtStartup, prefix+".target-consumer-lag-at-startup", 2*time.Second, "The best-effort maximum lag a consumer tries to achieve at startup. "+consumerLagUsage) + f.DurationVar(&cfg.MaxConsumerLagAtStartup, prefix+".max-consumer-lag-at-startup", 15*time.Second, "The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. "+consumerLagUsage) } func (cfg *Config) Validate() error { @@ -103,6 +106,12 @@ func (cfg *Config) Validate() error { if cfg.ProducerMaxRecordSizeBytes < minProducerRecordDataBytesLimit || cfg.ProducerMaxRecordSizeBytes > maxProducerRecordDataBytesLimit { return ErrInvalidProducerMaxRecordSizeBytes } + if (cfg.TargetConsumerLagAtStartup == 0 && cfg.MaxConsumerLagAtStartup != 0) || (cfg.TargetConsumerLagAtStartup != 0 && cfg.MaxConsumerLagAtStartup == 0) { + return ErrInconsistentConsumerLagAtStartup + } + if cfg.MaxConsumerLagAtStartup < cfg.TargetConsumerLagAtStartup { + return ErrInvalidMaxConsumerLagAtStartup + } return nil } diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go index 9972d13307e8b..b2a98afa99ed4 100644 --- a/pkg/kafka/partition/reader.go +++ b/pkg/kafka/partition/reader.go @@ -6,20 +6,31 @@ import ( "math" "time" + "github.com/coder/quartz" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/plugin/kprom" "github.com/grafana/loki/v3/pkg/kafka" ) +var errWaitTargetLagDeadlineExceeded = errors.New("waiting for target lag deadline exceeded") + +const ( + kafkaStartOffset = -2 + kafkaEndOffset = -1 +) + // Reader is responsible for reading data from a specific Kafka partition // and passing it to the consumer for processing. It is a core component of the // Loki ingester's Kafka-based ingestion pipeline. @@ -32,11 +43,13 @@ type Reader struct { consumerFactory ConsumerFactory committer *partitionCommitter lastProcessedOffset int64 + recordsChan chan []Record client *kgo.Client logger log.Logger metrics readerMetrics reg prometheus.Registerer + clock quartz.Clock } type Record struct { @@ -79,18 +92,45 @@ func NewReader( // start initializes the Kafka client and committer for the PartitionReader. // This method is called when the PartitionReader service starts. -func (p *Reader) start(_ context.Context) error { +func (p *Reader) start(ctx context.Context) error { var err error - p.client, err = kafka.NewReaderClient(p.kafkaCfg, p.metrics.kprom, p.logger, - kgo.ConsumePartitions(map[string]map[int32]kgo.Offset{ - p.kafkaCfg.Topic: {p.partitionID: kgo.NewOffset().AtStart()}, - }), - ) + p.client, err = kafka.NewReaderClient(p.kafkaCfg, p.metrics.kprom, p.logger) if err != nil { return errors.Wrap(err, "creating kafka reader client") } + + // We manage our commits manually, so we must fetch the last offset for our consumer group to find out where to read from. + lastCommittedOffset := p.fetchLastCommittedOffset(ctx) + p.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{ + p.kafkaCfg.Topic: {p.partitionID: kgo.NewOffset().At(lastCommittedOffset)}, + }) + p.committer = newCommitter(p.kafkaCfg, kadm.NewClient(p.client), p.partitionID, p.consumerGroup, p.logger, p.reg) - // todo: attempt to ensure max lag timestamp on startup. + + if targetLag, maxLag := p.kafkaCfg.TargetConsumerLagAtStartup, p.kafkaCfg.MaxConsumerLagAtStartup; targetLag > 0 && maxLag > 0 { + consumer, err := p.consumerFactory(p.committer) + if err != nil { + return fmt.Errorf("creating consumer: %w", err) + } + + cancelCtx, cancel := context.WithCancel(ctx) + // Temporarily start a consumer to do the initial update + recordsChan := make(chan []Record) + wait := consumer.Start(cancelCtx, recordsChan) + // Shutdown the consumer after catching up. We start a new instance in the run method to tie the lifecycle to the run context. + defer func() { + close(recordsChan) + cancel() + wait() + }() + + err = p.processNextFetchesUntilTargetOrMaxLagHonored(ctx, p.kafkaCfg.MaxConsumerLagAtStartup, p.kafkaCfg.TargetConsumerLagAtStartup, recordsChan) + if err != nil { + level.Error(p.logger).Log("msg", "failed to catch up to max lag", "partition", p.partitionID, "consumer_group", p.consumerGroup, "err", err) + return err + } + } + return nil } @@ -114,7 +154,251 @@ func (p *Reader) run(ctx context.Context) error { return nil } -func (p *Reader) startFetchLoop(ctx context.Context) <-chan []Record { +func (p *Reader) fetchLastCommittedOffset(ctx context.Context) int64 { + // We manually create a request so that we can request the offset for a single partition + // only, which is more performant than requesting the offsets for all partitions. + req := kmsg.NewPtrOffsetFetchRequest() + req.Topics = []kmsg.OffsetFetchRequestTopic{{Topic: p.kafkaCfg.Topic, Partitions: []int32{p.partitionID}}} + req.Group = p.consumerGroup + + resps := p.client.RequestSharded(ctx, req) + + // Since we issued a request for only 1 partition, we expect exactly 1 response. + if expected, actual := 1, len(resps); actual != expected { + level.Error(p.logger).Log("msg", fmt.Sprintf("unexpected number of responses (expected: %d, got: %d)", expected, actual), "expected", expected, "actual", len(resps)) + return kafkaStartOffset + } + // Ensure no error occurred. + res := resps[0] + if res.Err != nil { + level.Error(p.logger).Log("msg", "error fetching group offset for partition", "err", res.Err) + return kafkaStartOffset + } + + // Parse the response. + fetchRes, ok := res.Resp.(*kmsg.OffsetFetchResponse) + if !ok { + level.Error(p.logger).Log("msg", "unexpected response type") + return kafkaStartOffset + } + if expected, actual := 1, len(fetchRes.Groups); actual != expected { + level.Error(p.logger).Log("msg", fmt.Sprintf("unexpected number of groups in the response (expected: %d, got: %d)", expected, actual)) + return kafkaStartOffset + } + if expected, actual := 1, len(fetchRes.Groups[0].Topics); actual != expected { + level.Error(p.logger).Log("msg", fmt.Sprintf("unexpected number of topics in the response (expected: %d, got: %d)", expected, actual)) + return kafkaStartOffset + } + if expected, actual := p.kafkaCfg.Topic, fetchRes.Groups[0].Topics[0].Topic; expected != actual { + level.Error(p.logger).Log("msg", fmt.Sprintf("unexpected topic in the response (expected: %s, got: %s)", expected, actual)) + return kafkaStartOffset + } + if expected, actual := 1, len(fetchRes.Groups[0].Topics[0].Partitions); actual != expected { + level.Error(p.logger).Log("msg", fmt.Sprintf("unexpected number of partitions in the response (expected: %d, got: %d)", expected, actual)) + return kafkaStartOffset + } + if expected, actual := p.partitionID, fetchRes.Groups[0].Topics[0].Partitions[0].Partition; actual != expected { + level.Error(p.logger).Log("msg", fmt.Sprintf("unexpected partition in the response (expected: %d, got: %d)", expected, actual)) + return kafkaStartOffset + } + if err := kerr.ErrorForCode(fetchRes.Groups[0].Topics[0].Partitions[0].ErrorCode); err != nil { + level.Error(p.logger).Log("msg", "unexpected error in the response", "err", err) + return kafkaStartOffset + } + + return fetchRes.Groups[0].Topics[0].Partitions[0].Offset +} + +func (p *Reader) fetchPartitionOffset(ctx context.Context, position int64) (int64, error) { + // Create a custom request to fetch the latest offset of a specific partition. + // We manually create a request so that we can request the offset for a single partition + // only, which is more performant than requesting the offsets for all partitions. + partitionReq := kmsg.NewListOffsetsRequestTopicPartition() + partitionReq.Partition = p.partitionID + partitionReq.Timestamp = position + + topicReq := kmsg.NewListOffsetsRequestTopic() + topicReq.Topic = p.kafkaCfg.Topic + topicReq.Partitions = []kmsg.ListOffsetsRequestTopicPartition{partitionReq} + + req := kmsg.NewPtrListOffsetsRequest() + req.IsolationLevel = 0 // 0 means READ_UNCOMMITTED. + req.Topics = []kmsg.ListOffsetsRequestTopic{topicReq} + + // Even if we share the same client, other in-flight requests are not canceled once this context is canceled + // (or its deadline is exceeded). We've verified it with a unit test. + resps := p.client.RequestSharded(ctx, req) + + // Since we issued a request for only 1 partition, we expect exactly 1 response. + if expected := 1; len(resps) != 1 { + return 0, fmt.Errorf("unexpected number of responses (expected: %d, got: %d)", expected, len(resps)) + } + + // Ensure no error occurred. + res := resps[0] + if res.Err != nil { + return 0, res.Err + } + + // Parse the response. + listRes, ok := res.Resp.(*kmsg.ListOffsetsResponse) + if !ok { + return 0, errors.New("unexpected response type") + } + if expected, actual := 1, len(listRes.Topics); actual != expected { + return 0, fmt.Errorf("unexpected number of topics in the response (expected: %d, got: %d)", expected, actual) + } + if expected, actual := p.kafkaCfg.Topic, listRes.Topics[0].Topic; expected != actual { + return 0, fmt.Errorf("unexpected topic in the response (expected: %s, got: %s)", expected, actual) + } + if expected, actual := 1, len(listRes.Topics[0].Partitions); actual != expected { + return 0, fmt.Errorf("unexpected number of partitions in the response (expected: %d, got: %d)", expected, actual) + } + if expected, actual := p.partitionID, listRes.Topics[0].Partitions[0].Partition; actual != expected { + return 0, fmt.Errorf("unexpected partition in the response (expected: %d, got: %d)", expected, actual) + } + if err := kerr.ErrorForCode(listRes.Topics[0].Partitions[0].ErrorCode); err != nil { + return 0, err + } + + return listRes.Topics[0].Partitions[0].Offset, nil +} + +// processNextFetchesUntilTargetOrMaxLagHonored process records from Kafka until at least the maxLag is honored. +// This function does a best-effort to get lag below targetLag, but it's not guaranteed that it will be +// reached once this function successfully returns (only maxLag is guaranteed). +func (p *Reader) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Context, targetLag, maxLag time.Duration, recordsChan chan<- []Record) error { + logger := log.With(p.logger, "target_lag", targetLag, "max_lag", maxLag) + level.Info(logger).Log("msg", "partition reader is starting to consume partition until target and max consumer lag is honored") + + attempts := []func() (time.Duration, error){ + // First process fetches until at least the max lag is honored. + func() (time.Duration, error) { + return p.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan) + }, + + // If the target lag hasn't been reached with the first attempt (which stops once at least the max lag + // is honored) then we try to reach the (lower) target lag within a fixed time (best-effort). + // The timeout is equal to the max lag. This is done because we expect at least a 2x replay speed + // from Kafka (which means at most it takes 1s to ingest 2s of data): assuming new data is continuously + // written to the partition, we give the reader maxLag time to replay the backlog + ingest the new data + // written in the meanwhile. + func() (time.Duration, error) { + timedCtx, cancel := context.WithTimeoutCause(ctx, maxLag, errWaitTargetLagDeadlineExceeded) + defer cancel() + + return p.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger, recordsChan) + }, + + // If the target lag hasn't been reached with the previous attempt then we'll move on. However, + // we still need to guarantee that in the meanwhile the lag didn't increase and max lag is still honored. + func() (time.Duration, error) { + return p.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan) + }, + } + + var currLag time.Duration + for _, attempt := range attempts { + var err error + + currLag, err = attempt() + if errors.Is(err, errWaitTargetLagDeadlineExceeded) { + continue + } + if err != nil { + return err + } + if currLag <= targetLag { + level.Info(logger).Log( + "msg", "partition reader consumed partition and current lag is lower or equal to configured target consumer lag", + "last_consumed_offset", p.committer.lastCommittedOffset, + "current_lag", currLag, + ) + return nil + } + } + + level.Warn(logger).Log( + "msg", "partition reader consumed partition and current lag is lower than configured max consumer lag but higher than target consumer lag", + "last_consumed_offset", p.committer.lastCommittedOffset, + "current_lag", currLag, + ) + return nil +} + +func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag time.Duration, logger log.Logger, recordsChan chan<- []Record) (time.Duration, error) { + boff := backoff.New(ctx, backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: time.Second, + MaxRetries: 0, // Retry forever (unless context is canceled / deadline exceeded). + }) + currLag := time.Duration(0) + + for boff.Ongoing() { + // Send a direct request to the Kafka backend to fetch the partition start offset. + partitionStartOffset, err := p.fetchPartitionOffset(ctx, kafkaStartOffset) + if err != nil { + level.Warn(logger).Log("msg", "partition reader failed to fetch partition start offset", "err", err) + boff.Wait() + continue + } + + // Send a direct request to the Kafka backend to fetch the last produced offset. + // We intentionally don't use WaitNextFetchLastProducedOffset() to not introduce further + // latency. + lastProducedOffsetRequestedAt := time.Now() + lastProducedOffset, err := p.fetchPartitionOffset(ctx, kafkaEndOffset) + if err != nil { + level.Warn(logger).Log("msg", "partition reader failed to fetch last produced offset", "err", err) + boff.Wait() + continue + } + lastProducedOffset = lastProducedOffset - 1 // Kafka returns the next empty offset so we must subtract 1 to get the oldest written offset. + + // Ensure there are some records to consume. For example, if the partition has been inactive for a long + // time and all its records have been deleted, the partition start offset may be > 0 but there are no + // records to actually consume. + if partitionStartOffset > lastProducedOffset { + level.Info(logger).Log("msg", "partition reader found no records to consume because partition is empty", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset) + return 0, nil + } + + // This message is NOT expected to be logged with a very high rate. In this log we display the last measured + // lag. If we don't have it (lag is zero value), then it will not be logged. + level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset) + + for boff.Ongoing() { + // Continue reading until we reached the desired offset. + if lastProducedOffset <= p.lastProcessedOffset { + break + } + + records := p.poll(ctx) + recordsChan <- records + } + if boff.Err() != nil { + return 0, boff.ErrCause() + } + + // If it took less than the max desired lag to replay the partition + // then we can stop here, otherwise we'll have to redo it. + if currLag = time.Since(lastProducedOffsetRequestedAt); currLag <= maxLag { + return currLag, nil + } + } + + return 0, boff.ErrCause() +} + +func loggerWithCurrentLagIfSet(logger log.Logger, currLag time.Duration) log.Logger { + if currLag <= 0 { + return logger + } + + return log.With(logger, "current_lag", currLag) +} + +func (p *Reader) startFetchLoop(ctx context.Context) chan []Record { records := make(chan []Record) go func() { for { diff --git a/pkg/kafka/partition/reader_test.go b/pkg/kafka/partition/reader_test.go index addc5779bb6a8..b68c9a0622704 100644 --- a/pkg/kafka/partition/reader_test.go +++ b/pkg/kafka/partition/reader_test.go @@ -39,7 +39,10 @@ func (m *mockConsumer) Start(ctx context.Context, recordsChan <-chan []Record) f select { case <-ctx.Done(): return - case records := <-recordsChan: + case records, ok := <-recordsChan: + if !ok { + return + } m.recordsChan <- records } } @@ -100,3 +103,61 @@ func TestPartitionReader_BasicFunctionality(t *testing.T) { err = services.StopAndAwaitTerminated(context.Background(), partitionReader) require.NoError(t, err) } + +func TestPartitionReader_ProcessCatchUpAtStartup(t *testing.T) { + _, kafkaCfg := testkafka.CreateCluster(t, 1, "test-topic") + var consumerStarting *mockConsumer + + consumerFactory := func(_ Committer) (Consumer, error) { + // Return two consumers to ensure we are processing requests during service `start()` and not during `run()`. + if consumerStarting == nil { + consumerStarting = newMockConsumer() + return consumerStarting, nil + } + return newMockConsumer(), nil + } + + partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + producer, err := kafka.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + + stream := logproto.Stream{ + Labels: labels.FromStrings("foo", "bar").String(), + Entries: []logproto.Entry{{Timestamp: time.Now(), Line: "test"}}, + } + + records, err := kafka.Encode(0, "test-tenant", stream, 10<<20) + require.NoError(t, err) + require.Len(t, records, 1) + + producer.ProduceSync(context.Background(), records...) + producer.ProduceSync(context.Background(), records...) + + // Enable the catch up logic so starting the reader will read any existing records. + kafkaCfg.TargetConsumerLagAtStartup = time.Second * 1 + kafkaCfg.MaxConsumerLagAtStartup = time.Second * 2 + + err = services.StartAndAwaitRunning(context.Background(), partitionReader) + require.NoError(t, err) + + // This message should not be processed by the startingConsumer + producer.ProduceSync(context.Background(), records...) + + // Wait for records to be processed + require.Eventually(t, func() bool { + return len(consumerStarting.recordsChan) == 1 // All pending messages will be received in one batch + }, 10*time.Second, 10*time.Millisecond) + + receivedRecords := <-consumerStarting.recordsChan + require.Len(t, receivedRecords, 2) + assert.Equal(t, "test-tenant", receivedRecords[0].TenantID) + assert.Equal(t, records[0].Value, receivedRecords[0].Content) + assert.Equal(t, "test-tenant", receivedRecords[1].TenantID) + assert.Equal(t, records[0].Value, receivedRecords[1].Content) + + assert.Equal(t, 0, len(consumerStarting.recordsChan)) + + err = services.StopAndAwaitTerminated(context.Background(), partitionReader) + require.NoError(t, err) +} diff --git a/pkg/kafka/testkafka/cluster.go b/pkg/kafka/testkafka/cluster.go index fc00e7272e7aa..cc5847c2bfd35 100644 --- a/pkg/kafka/testkafka/cluster.go +++ b/pkg/kafka/testkafka/cluster.go @@ -102,7 +102,7 @@ func addSupportForConsumerGroups(t testing.TB, cluster *kfake.Cluster, topicName partitionID = allPartitions } else { partitionID = req.Groups[0].Topics[0].Partitions[0] - assert.Len(t, req.Groups[0], 1, "test only has support for one partition per request") + assert.Len(t, req.Groups[0].Topics, 1, "test only has support for one partition per request") assert.Len(t, req.Groups[0].Topics[0].Partitions, 1, "test only has support for one partition per request") }