From 83a8893a3fbad3a87d7aea3a61e7dae2f6a34168 Mon Sep 17 00:00:00 2001 From: benclive Date: Mon, 14 Oct 2024 11:40:40 +0100 Subject: [PATCH] feat(kafka): Remove rate limits for kafka ingestion (#14460) --- pkg/ingester/checkpoint_test.go | 5 ++- pkg/ingester/ingester.go | 11 +++-- pkg/ingester/instance.go | 4 +- pkg/ingester/instance_test.go | 22 +++++----- pkg/ingester/limiter.go | 40 +++++++++++++++---- pkg/ingester/limiter_test.go | 4 +- pkg/ingester/owned_streams_test.go | 2 +- .../recalculate_owned_streams_test.go | 2 +- pkg/ingester/stream_test.go | 40 +++++++++---------- pkg/ingester/streams_map_test.go | 6 +-- pkg/kafka/partition/reader.go | 6 ++- 11 files changed, 87 insertions(+), 55 deletions(-) diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 402768988bb52..7f30898bd7b7c 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -459,7 +459,7 @@ func Test_SeriesIterator(t *testing.T) { limits, err := validation.NewOverrides(l, nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) for i := 0; i < 3; i++ { inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil) @@ -506,7 +506,8 @@ func Benchmark_SeriesIterator(b *testing.B) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(b, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) for i := range instances { inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 40028cfcd528a..adf0cd7b332f5 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -404,18 +404,21 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con i.SetExtractorWrapper(i.cfg.SampleExtractorWrapper) } - var limiterStrategy limiterRingStrategy + var streamCountLimiter limiterRingStrategy var ownedStreamsStrategy ownershipStrategy + var streamRateLimiter RateLimiterStrategy if i.cfg.KafkaIngestion.Enabled { - limiterStrategy = newPartitionRingLimiterStrategy(partitionRingWatcher, limits.IngestionPartitionsTenantShardSize) + streamCountLimiter = newPartitionRingLimiterStrategy(partitionRingWatcher, limits.IngestionPartitionsTenantShardSize) ownedStreamsStrategy = newOwnedStreamsPartitionStrategy(i.ingestPartitionID, partitionRingWatcher, limits.IngestionPartitionsTenantShardSize, util_log.Logger) + streamRateLimiter = &NoLimitsStrategy{} // Kafka ingestion does not have per-stream rate limits, because we control the consumption speed. } else { - limiterStrategy = newIngesterRingLimiterStrategy(i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor) + streamCountLimiter = newIngesterRingLimiterStrategy(i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor) ownedStreamsStrategy = newOwnedStreamsIngesterStrategy(i.lifecycler.ID, i.readRing, util_log.Logger) + streamRateLimiter = &TenantBasedStrategy{limits: limits} } // Now that the lifecycler has been created, we can create the limiter // which depends on it. - i.limiter = NewLimiter(limits, metrics, limiterStrategy) + i.limiter = NewLimiter(limits, metrics, streamCountLimiter, streamRateLimiter) i.recalculateOwnedStreams = newRecalculateOwnedStreamsSvc(i.getInstances, ownedStreamsStrategy, cfg.OwnedStreamsCheckInterval, util_log.Logger) return i, nil diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 72ed01793ce7f..685fa90a8a762 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -307,7 +307,7 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre return nil, fmt.Errorf("failed to create stream: %w", err) } - s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs) + s := newStream(chunkfmt, headfmt, i.cfg, i.limiter.rateLimitStrategy, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs) // record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them). if record != nil { @@ -372,7 +372,7 @@ func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) (*st return nil, fmt.Errorf("failed to create stream for fingerprint: %w", err) } - s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs) + s := newStream(chunkfmt, headfmt, i.cfg, i.limiter.rateLimitStrategy, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs) i.onStreamCreated(s) diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 819577540f664..369d0ab2d7469 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -78,7 +78,7 @@ var NilMetrics = newIngesterMetrics(nil, constants.Loki) func TestLabelsCollisions(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) require.Nil(t, err) @@ -106,7 +106,7 @@ func TestLabelsCollisions(t *testing.T) { func TestConcurrentPushes(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) require.Nil(t, err) @@ -158,7 +158,7 @@ func TestConcurrentPushes(t *testing.T) { func TestGetStreamRates(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) require.NoError(t, err) @@ -245,7 +245,7 @@ func labelHashNoShard(l labels.Labels) uint64 { func TestSyncPeriod(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) const ( syncPeriod = 1 * time.Minute @@ -290,7 +290,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) { t.Helper() limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) indexShards := 2 // just some random values @@ -315,7 +315,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) { require.NoError(t, err) chunkfmt, headfmt, err := instance.chunkFormatAt(minTs(&testStream)) require.NoError(t, err) - chunk := newStream(chunkfmt, headfmt, cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil, nil).NewChunk() + chunk := newStream(chunkfmt, headfmt, cfg, limiter.rateLimitStrategy, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil, nil).NewChunk() for _, entry := range testStream.Entries { dup, err := chunk.Append(&entry) require.False(t, dup) @@ -507,7 +507,7 @@ func makeRandomLabels() labels.Labels { func Benchmark_PushInstance(b *testing.B) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(b, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) ctx := context.Background() @@ -549,7 +549,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { l.MaxLocalStreamsPerUser = 100000 limits, err := validation.NewOverrides(l, nil) require.NoError(b, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) ctx := context.Background() @@ -575,7 +575,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { b.Run("addTailersToNewStream", func(b *testing.B) { for n := 0; n < b.N; n++ { - inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil, nil)) + inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter.rateLimitStrategy, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil, nil)) } }) } @@ -1089,7 +1089,7 @@ func TestStreamShardingUsage(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), limitsDefinition) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) defaultShardStreamsCfg := limiter.limits.ShardStreams("fake") tenantShardStreamsCfg := limiter.limits.ShardStreams(customTenant1) @@ -1454,7 +1454,7 @@ func defaultInstance(t *testing.T) *instance { &ingesterConfig, defaultPeriodConfigs, "fake", - NewLimiter(overrides, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)), + NewLimiter(overrides, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: overrides}), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index fd3a5c7c1b9bb..c4e64149f1658 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -38,9 +38,10 @@ type Limits interface { // Limiter implements primitives to get the maximum number of streams // an ingester can handle for a specific tenant type Limiter struct { - limits Limits - ringStrategy limiterRingStrategy - metrics *ingesterMetrics + limits Limits + ringStrategy limiterRingStrategy + metrics *ingesterMetrics + rateLimitStrategy RateLimiterStrategy mtx sync.RWMutex disabled bool @@ -51,6 +52,7 @@ func (l *Limiter) DisableForWALReplay() { defer l.mtx.Unlock() l.disabled = true l.metrics.limiterEnabled.Set(0) + l.rateLimitStrategy.SetDisabled(true) } func (l *Limiter) Enable() { @@ -58,6 +60,7 @@ func (l *Limiter) Enable() { defer l.mtx.Unlock() l.disabled = false l.metrics.limiterEnabled.Set(1) + l.rateLimitStrategy.SetDisabled(false) } type limiterRingStrategy interface { @@ -65,11 +68,12 @@ type limiterRingStrategy interface { } // NewLimiter makes a new limiter -func NewLimiter(limits Limits, metrics *ingesterMetrics, ingesterRingLimiterStrategy limiterRingStrategy) *Limiter { +func NewLimiter(limits Limits, metrics *ingesterMetrics, ingesterRingLimiterStrategy limiterRingStrategy, rateLimitStrategy RateLimiterStrategy) *Limiter { return &Limiter{ - limits: limits, - ringStrategy: ingesterRingLimiterStrategy, - metrics: metrics, + limits: limits, + ringStrategy: ingesterRingLimiterStrategy, + metrics: metrics, + rateLimitStrategy: rateLimitStrategy, } } @@ -231,9 +235,15 @@ func (l *streamCountLimiter) getSuppliers(tenant string) (streamCountSupplier, f type RateLimiterStrategy interface { RateLimit(tenant string) validation.RateLimit + SetDisabled(bool) } -func (l *Limiter) RateLimit(tenant string) validation.RateLimit { +type TenantBasedStrategy struct { + disabled bool + limits Limits +} + +func (l *TenantBasedStrategy) RateLimit(tenant string) validation.RateLimit { if l.disabled { return validation.Unlimited } @@ -241,6 +251,20 @@ func (l *Limiter) RateLimit(tenant string) validation.RateLimit { return l.limits.PerStreamRateLimit(tenant) } +func (l *TenantBasedStrategy) SetDisabled(disabled bool) { + l.disabled = disabled +} + +type NoLimitsStrategy struct{} + +func (l *NoLimitsStrategy) RateLimit(_ string) validation.RateLimit { + return validation.Unlimited +} + +func (l *NoLimitsStrategy) SetDisabled(_ bool) { + // no-op +} + type StreamRateLimiter struct { recheckPeriod time.Duration recheckAt time.Time diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index b8c7fe72e3597..b611db4d109e1 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -133,7 +133,7 @@ func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) { ownedStreamCount: testData.ownedStreamCount, } strategy := &fixedStrategy{localLimit: testData.calculatedLocalLimit} - limiter := NewLimiter(limits, NilMetrics, strategy) + limiter := NewLimiter(limits, NilMetrics, strategy, &TenantBasedStrategy{limits: limits}) defaultCountSupplier := func() int { return testData.streams } @@ -182,7 +182,7 @@ func TestLimiter_minNonZero(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - limiter := NewLimiter(nil, NilMetrics, nil) + limiter := NewLimiter(nil, NilMetrics, nil, nil) assert.Equal(t, testData.expected, limiter.minNonZero(testData.first, testData.second)) }) } diff --git a/pkg/ingester/owned_streams_test.go b/pkg/ingester/owned_streams_test.go index 373d37a5f62e6..1f197d580e04c 100644 --- a/pkg/ingester/owned_streams_test.go +++ b/pkg/ingester/owned_streams_test.go @@ -17,7 +17,7 @@ func Test_OwnedStreamService(t *testing.T) { require.NoError(t, err) // Mock the ring ring := &ringCountMock{count: 30} - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(ring, 3)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(ring, 3), &TenantBasedStrategy{limits: limits}) service := newOwnedStreamService("test", limiter) require.Equal(t, 0, service.getOwnedStreamCount()) diff --git a/pkg/ingester/recalculate_owned_streams_test.go b/pkg/ingester/recalculate_owned_streams_test.go index d2d3583095b02..4903375568057 100644 --- a/pkg/ingester/recalculate_owned_streams_test.go +++ b/pkg/ingester/recalculate_owned_streams_test.go @@ -70,7 +70,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T) UseOwnedStreamCount: testData.featureEnabled, }, nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(mockRing, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(mockRing, 1), &TenantBasedStrategy{limits: limits}) tenant, err := newInstance( defaultConfig(), diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index fcad6558b21de..8bf7bfaf4ce98 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -56,7 +56,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { @@ -68,7 +68,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { chunkfmt, headfmt, cfg, - limiter, + limiter.rateLimitStrategy, "fake", model.Fingerprint(0), labels.Labels{ @@ -114,7 +114,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { func TestPushDeduplication(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) chunkfmt, headfmt := defaultChunkFormat(t) @@ -122,7 +122,7 @@ func TestPushDeduplication(t *testing.T) { chunkfmt, headfmt, defaultConfig(), - limiter, + limiter.rateLimitStrategy, "fake", model.Fingerprint(0), labels.Labels{ @@ -150,7 +150,7 @@ func TestPushDeduplication(t *testing.T) { func TestPushDeduplicationExtraMetrics(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) chunkfmt, headfmt := defaultChunkFormat(t) @@ -182,7 +182,7 @@ func TestPushDeduplicationExtraMetrics(t *testing.T) { chunkfmt, headfmt, defaultConfig(), - limiter, + limiter.rateLimitStrategy, "fake", model.Fingerprint(0), labels.Labels{ @@ -220,7 +220,7 @@ func TestPushDeduplicationExtraMetrics(t *testing.T) { func TestPushRejectOldCounter(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) chunkfmt, headfmt := defaultChunkFormat(t) @@ -228,7 +228,7 @@ func TestPushRejectOldCounter(t *testing.T) { chunkfmt, headfmt, defaultConfig(), - limiter, + limiter.rateLimitStrategy, "fake", model.Fingerprint(0), labels.Labels{ @@ -328,7 +328,7 @@ func TestEntryErrorCorrectlyReported(t *testing.T) { } limits, err := validation.NewOverrides(l, nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) chunkfmt, headfmt := defaultChunkFormat(t) @@ -336,7 +336,7 @@ func TestEntryErrorCorrectlyReported(t *testing.T) { chunkfmt, headfmt, &cfg, - limiter, + limiter.rateLimitStrategy, "fake", model.Fingerprint(0), labels.Labels{ @@ -367,7 +367,7 @@ func TestUnorderedPush(t *testing.T) { cfg.MaxChunkAge = 10 * time.Second limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) chunkfmt, headfmt := defaultChunkFormat(t) @@ -375,7 +375,7 @@ func TestUnorderedPush(t *testing.T) { chunkfmt, headfmt, &cfg, - limiter, + limiter.rateLimitStrategy, "fake", model.Fingerprint(0), labels.Labels{ @@ -470,7 +470,7 @@ func TestPushRateLimit(t *testing.T) { } limits, err := validation.NewOverrides(l, nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) chunkfmt, headfmt := defaultChunkFormat(t) @@ -478,7 +478,7 @@ func TestPushRateLimit(t *testing.T) { chunkfmt, headfmt, defaultConfig(), - limiter, + limiter.rateLimitStrategy, "fake", model.Fingerprint(0), labels.Labels{ @@ -510,7 +510,7 @@ func TestPushRateLimitAllOrNothing(t *testing.T) { } limits, err := validation.NewOverrides(l, nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) cfg := defaultConfig() chunkfmt, headfmt := defaultChunkFormat(t) @@ -519,7 +519,7 @@ func TestPushRateLimitAllOrNothing(t *testing.T) { chunkfmt, headfmt, cfg, - limiter, + limiter.rateLimitStrategy, "fake", model.Fingerprint(0), labels.Labels{ @@ -549,7 +549,7 @@ func TestPushRateLimitAllOrNothing(t *testing.T) { func TestReplayAppendIgnoresValidityWindow(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) cfg := defaultConfig() cfg.MaxChunkAge = time.Minute @@ -559,7 +559,7 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) { chunkfmt, headfmt, cfg, - limiter, + limiter.rateLimitStrategy, "fake", model.Fingerprint(0), labels.Labels{ @@ -617,10 +617,10 @@ func Benchmark_PushStream(b *testing.B) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(b, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) chunkfmt, headfmt := defaultChunkFormat(b) - s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil, nil) + s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter.rateLimitStrategy, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil, nil) expr, err := syntax.ParseLogSelector(`{namespace="loki-dev"}`, true) require.NoError(b, err) t, err := newTailer("foo", expr, &fakeTailServer{}, 10) diff --git a/pkg/ingester/streams_map_test.go b/pkg/ingester/streams_map_test.go index 87e8332eddd45..273c489d34d4a 100644 --- a/pkg/ingester/streams_map_test.go +++ b/pkg/ingester/streams_map_test.go @@ -13,7 +13,7 @@ import ( func TestStreamsMap(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) chunkfmt, headfmt := defaultChunkFormat(t) ss := []*stream{ @@ -21,7 +21,7 @@ func TestStreamsMap(t *testing.T) { chunkfmt, headfmt, defaultConfig(), - limiter, + limiter.rateLimitStrategy, "fake", model.Fingerprint(1), labels.Labels{ @@ -37,7 +37,7 @@ func TestStreamsMap(t *testing.T) { chunkfmt, headfmt, defaultConfig(), - limiter, + limiter.rateLimitStrategy, "fake", model.Fingerprint(2), labels.Labels{ diff --git a/pkg/kafka/partition/reader.go b/pkg/kafka/partition/reader.go index 74f18b02057f3..e07a65f8a0f15 100644 --- a/pkg/kafka/partition/reader.go +++ b/pkg/kafka/partition/reader.go @@ -105,6 +105,8 @@ func (p *Reader) start(ctx context.Context) error { p.kafkaCfg.Topic: {p.partitionID: kgo.NewOffset().At(lastCommittedOffset)}, }) + level.Info(p.logger).Log("msg", "initialising partition reader", "last_committed_offset", lastCommittedOffset, "partition", p.partitionID, "consumer_group", p.consumerGroup) + p.committer = newCommitter(p.kafkaCfg, kadm.NewClient(p.client), p.partitionID, p.consumerGroup, p.logger, p.reg) if targetLag, maxLag := p.kafkaCfg.TargetConsumerLagAtStartup, p.kafkaCfg.MaxConsumerLagAtStartup; targetLag > 0 && maxLag > 0 { @@ -355,6 +357,8 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t } lastProducedOffset = lastProducedOffset - 1 // Kafka returns the next empty offset so we must subtract 1 to get the oldest written offset. + level.Debug(logger).Log("msg", "fetched latest offset information", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset) + // Ensure there are some records to consume. For example, if the partition has been inactive for a long // time and all its records have been deleted, the partition start offset may be > 0 but there are no // records to actually consume. @@ -365,7 +369,7 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t // This message is NOT expected to be logged with a very high rate. In this log we display the last measured // lag. If we don't have it (lag is zero value), then it will not be logged. - level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset) + level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset, "last_processed_offset", p.lastProcessedOffset, "offset_lag", lastProducedOffset-p.lastProcessedOffset) for boff.Ongoing() { // Continue reading until we reached the desired offset.