diff --git a/clients/pkg/promtail/targets/windows/target_test.go b/clients/pkg/promtail/targets/windows/target_test.go index a9a692b21ecfc..f1f786e3a6f6e 100644 --- a/clients/pkg/promtail/targets/windows/target_test.go +++ b/clients/pkg/promtail/targets/windows/target_test.go @@ -28,7 +28,7 @@ func init() { // Enable debug logging cfg := &server.Config{} _ = cfg.LogLevel.Set("debug") - util_log.InitLogger(cfg, nil, false) + util_log.InitLogger(cfg, nil, true, false) } // Test that you can use to generate event logs locally. diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index d5f311366a3e6..8286b66cb12fd 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -571,3 +571,105 @@ func buildChunks(t testing.TB, size int) []Chunk { } return chks } + +func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) { + for _, waitForCheckpoint := range []bool{false, true} { + t.Run(fmt.Sprintf("checkpoint-%v", waitForCheckpoint), func(t *testing.T) { + walDir := t.TempDir() + + ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir) + + // First launch the ingester with unordered writes enabled + dft := defaultLimitsTestConfig() + dft.UnorderedWrites = true + limits, err := validation.NewOverrides(dft, nil) + require.NoError(t, err) + + newStore := func() *mockStore { + return &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + } + + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger()) + require.NoError(t, err) + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + req := logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="bar",bar="baz1"}`, + }, + { + Labels: `{foo="bar",bar="baz2"}`, + }, + }, + } + + start := time.Now() + steps := 10 + end := start.Add(time.Second * time.Duration(steps)) + + // Write data out of order + for i := steps - 1; i >= 0; i-- { + req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ + Timestamp: start.Add(time.Duration(i) * time.Second), + Line: fmt.Sprintf("line %d", i), + }) + req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{ + Timestamp: start.Add(time.Duration(i) * time.Second), + Line: fmt.Sprintf("line %d", i), + }) + } + + ctx := user.InjectOrgID(context.Background(), "test") + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + if waitForCheckpoint { + // Ensure we have checkpointed now + expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*10) // give a bit of buffer + + // Add some more data after the checkpoint + tmp := end + end = end.Add(time.Second * time.Duration(steps)) + req.Streams[0].Entries = nil + req.Streams[1].Entries = nil + // Write data out of order again + for i := steps - 1; i >= 0; i-- { + req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ + Timestamp: tmp.Add(time.Duration(i) * time.Second), + Line: fmt.Sprintf("line %d", steps+i), + }) + req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{ + Timestamp: tmp.Add(time.Duration(i) * time.Second), + Line: fmt.Sprintf("line %d", steps+i), + }) + } + + _, err = i.Push(ctx, &req) + require.NoError(t, err) + } + + ensureIngesterData(ctx, t, start, end, i) + + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) + + // Now disable unordered writes + limitCfg := defaultLimitsTestConfig() + limitCfg.UnorderedWrites = false + limits, err = validation.NewOverrides(limitCfg, nil) + require.NoError(t, err) + + // restart the ingester + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger()) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + + // ensure we've recovered data from wal segments + ensureIngesterData(ctx, t, start, end, i) + }) + } +} diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 7bcbcb3a67024..14306b01dc4af 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -297,7 +297,7 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *wal.Recor return nil, fmt.Errorf("failed to create stream: %w", err) } - s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.streamRateCalculator, i.metrics, i.writeFailures) + s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures) // record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them). if record != nil { @@ -335,7 +335,7 @@ func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) (*st return nil, fmt.Errorf("failed to create stream for fingerprint: %w", err) } - s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.streamRateCalculator, i.metrics, i.writeFailures) + s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures) i.streamsCreatedTotal.Inc() memoryStreams.WithLabelValues(i.instanceID).Inc() diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index bbb2e899bb307..ac29f3516df45 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -299,7 +299,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) { require.NoError(t, err) chunkfmt, headfmt, err := instance.chunkFormatAt(minTs(&testStream)) require.NoError(t, err) - chunk := newStream(chunkfmt, headfmt, cfg, limiter, "fake", 0, nil, NewStreamRateCalculator(), NilMetrics, nil).NewChunk() + chunk := newStream(chunkfmt, headfmt, cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil).NewChunk() for _, entry := range testStream.Entries { err = chunk.Append(&entry) require.NoError(t, err) @@ -556,7 +556,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { b.Run("addTailersToNewStream", func(b *testing.B) { for n := 0; n < b.N; n++ { - inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter, "fake", 0, lbs, NewStreamRateCalculator(), NilMetrics, nil)) + inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil)) } }) } diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index d6fd3e7952203..e48c2a018d277 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -23,6 +23,7 @@ type RingCount interface { } type Limits interface { + UnorderedWrites(userID string) bool MaxLocalStreamsPerUser(userID string) int MaxGlobalStreamsPerUser(userID string) int PerStreamRateLimit(userID string) validation.RateLimit @@ -65,6 +66,16 @@ func NewLimiter(limits Limits, metrics *ingesterMetrics, ring RingCount, replica } } +func (l *Limiter) UnorderedWrites(userID string) bool { + // WAL replay should not discard previously ack'd writes, + // so allow out of order writes while the limiter is disabled. + // This allows replaying unordered WALs into ordered configurations. + if l.disabled { + return true + } + return l.limits.UnorderedWrites(userID) +} + // AssertMaxStreamsPerUser ensures limit has not been reached compared to the current // number of streams in input and returns an error if so. func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error { diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go index 1befaaf3433ec..32246360028f0 100644 --- a/pkg/ingester/recovery.go +++ b/pkg/ingester/recovery.go @@ -202,6 +202,26 @@ func (r *ingesterRecoverer) Close() { return nil } + // If we've replayed a WAL with unordered writes, but the new + // configuration disables them, convert all streams/head blocks + // to ensure unordered writes are disabled after the replay, + // but without dropping any previously accepted data. + isAllowed := r.ing.limiter.UnorderedWrites(s.tenant) + old := s.unorderedWrites + s.unorderedWrites = isAllowed + + if !isAllowed && old { + err := s.chunks[len(s.chunks)-1].chunk.ConvertHead(headBlockType(s.chunkFormat, isAllowed)) + if err != nil { + level.Warn(r.logger).Log( + "msg", "error converting headblock", + "err", err.Error(), + "stream", s.labels.String(), + "component", "ingesterRecoverer", + ) + } + } + return nil }) } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index fc5d5332d4cc0..4c6aa4f9a122e 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -102,6 +102,7 @@ func newStream( tenant string, fp model.Fingerprint, labels labels.Labels, + unorderedWrites bool, streamRateCalculator *StreamRateCalculator, metrics *ingesterMetrics, writeFailures *writefailures.Manager, @@ -119,6 +120,8 @@ func newStream( metrics: metrics, tenant: tenant, streamRateCalculator: streamRateCalculator, + + unorderedWrites: unorderedWrites, writeFailures: writeFailures, chunkFormat: chunkFormat, chunkHeadBlockFormat: headBlockFmt, @@ -392,7 +395,7 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWh // The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age. cutoff := highestTs.Add(-s.cfg.MaxChunkAge / 2) - if !isReplay && !highestTs.IsZero() && cutoff.After(entries[i].Timestamp) { + if !isReplay && s.unorderedWrites && !highestTs.IsZero() && cutoff.After(entries[i].Timestamp) { failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(cutoff)}) s.writeFailures.Log(s.tenant, fmt.Errorf("%w for stream %s", failedEntriesWithError[len(failedEntriesWithError)-1].e, s.labels)) outOfOrderSamples++ @@ -432,8 +435,12 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWh func (s *stream) reportMetrics(outOfOrderSamples, outOfOrderBytes, rateLimitedSamples, rateLimitedBytes int) { if outOfOrderSamples > 0 { - validation.DiscardedSamples.WithLabelValues(validation.TooFarBehind, s.tenant).Add(float64(outOfOrderSamples)) - validation.DiscardedBytes.WithLabelValues(validation.TooFarBehind, s.tenant).Add(float64(outOfOrderBytes)) + name := validation.OutOfOrder + if s.unorderedWrites { + name = validation.TooFarBehind + } + validation.DiscardedSamples.WithLabelValues(name, s.tenant).Add(float64(outOfOrderSamples)) + validation.DiscardedBytes.WithLabelValues(name, s.tenant).Add(float64(outOfOrderBytes)) } if rateLimitedSamples > 0 { validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedSamples)) diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 4e7110d2ac312..641fd1c926523 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -64,6 +64,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { labels.Labels{ {Name: "foo", Value: "bar"}, }, + true, NewStreamRateCalculator(), NilMetrics, nil, @@ -115,6 +116,7 @@ func TestPushDeduplication(t *testing.T) { labels.Labels{ {Name: "foo", Value: "bar"}, }, + true, NewStreamRateCalculator(), NilMetrics, nil, @@ -149,6 +151,7 @@ func TestPushRejectOldCounter(t *testing.T) { labels.Labels{ {Name: "foo", Value: "bar"}, }, + true, NewStreamRateCalculator(), NilMetrics, nil, @@ -254,6 +257,7 @@ func TestEntryErrorCorrectlyReported(t *testing.T) { labels.Labels{ {Name: "foo", Value: "bar"}, }, + true, NewStreamRateCalculator(), NilMetrics, nil, @@ -288,6 +292,7 @@ func TestUnorderedPush(t *testing.T) { labels.Labels{ {Name: "foo", Value: "bar"}, }, + true, NewStreamRateCalculator(), NilMetrics, nil, @@ -389,6 +394,7 @@ func TestPushRateLimit(t *testing.T) { labels.Labels{ {Name: "foo", Value: "bar"}, }, + true, NewStreamRateCalculator(), NilMetrics, nil, @@ -426,6 +432,7 @@ func TestPushRateLimitAllOrNothing(t *testing.T) { labels.Labels{ {Name: "foo", Value: "bar"}, }, + true, NewStreamRateCalculator(), NilMetrics, nil, @@ -462,6 +469,7 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) { labels.Labels{ {Name: "foo", Value: "bar"}, }, + true, NewStreamRateCalculator(), NilMetrics, nil, @@ -515,7 +523,7 @@ func Benchmark_PushStream(b *testing.B) { limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) chunkfmt, headfmt := defaultChunkFormat(b) - s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, NewStreamRateCalculator(), NilMetrics, nil) + s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil) t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{}, 10) require.NoError(b, err) diff --git a/pkg/ingester/streams_map_test.go b/pkg/ingester/streams_map_test.go index 1b82a2e42f6ee..2468ffd7c79d8 100644 --- a/pkg/ingester/streams_map_test.go +++ b/pkg/ingester/streams_map_test.go @@ -27,6 +27,7 @@ func TestStreamsMap(t *testing.T) { labels.Labels{ {Name: "foo", Value: "bar"}, }, + true, NewStreamRateCalculator(), NilMetrics, nil, @@ -41,6 +42,7 @@ func TestStreamsMap(t *testing.T) { labels.Labels{ {Name: "bar", Value: "foo"}, }, + true, NewStreamRateCalculator(), NilMetrics, nil, diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index bd4879028647a..c7cc4395d8f8a 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -223,6 +223,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxLocalStreamsPerUser, "ingester.max-streams-per-user", 0, "Maximum number of active streams per user, per ingester. 0 to disable.") f.IntVar(&l.MaxGlobalStreamsPerUser, "ingester.max-global-streams-per-user", 5000, "Maximum number of active streams per user, across the cluster. 0 to disable. When the global limit is enabled, each ingester is configured with a dynamic local limit based on the replication factor and the current number of healthy ingesters, and is kept updated whenever the number of ingesters change.") + // TODO(ashwanth) Deprecated. This will be removed with the next major release and out-of-order writes would be accepted by default. + f.BoolVar(&l.UnorderedWrites, "ingester.unordered-writes", true, "Deprecated. When true, out-of-order writes are accepted.") + _ = l.PerStreamRateLimit.Set(strconv.Itoa(defaultPerStreamRateLimit)) f.Var(&l.PerStreamRateLimit, "ingester.per-stream-rate-limit", "Maximum byte rate per second per stream, also expressible in human readable forms (1MB, 256KB, etc).") _ = l.PerStreamRateLimitBurst.Set(strconv.Itoa(defaultPerStreamBurstLimit)) @@ -707,6 +710,10 @@ func (o *Overrides) StreamRetention(userID string) []StreamRetention { return o.getOverridesForUser(userID).StreamRetention } +func (o *Overrides) UnorderedWrites(userID string) bool { + return o.getOverridesForUser(userID).UnorderedWrites +} + func (o *Overrides) DeletionMode(userID string) string { return o.getOverridesForUser(userID).DeletionMode } diff --git a/pkg/validation/validate.go b/pkg/validation/validate.go index 639ae611bf7ca..09c444aa64987 100644 --- a/pkg/validation/validate.go +++ b/pkg/validation/validate.go @@ -32,6 +32,11 @@ const ( // StreamRateLimit is a reason for discarding lines when the streams own rate limit is hit // rather than the overall ingestion rate limit. StreamRateLimit = "per_stream_rate_limit" + // OutOfOrder is a reason for discarding lines when Loki doesn't accept out + // of order log lines (parameter `-ingester.unordered-writes` is set to + // `false`) and the lines in question are older than the newest line in the + // stream. + OutOfOrder = "out_of_order" // TooFarBehind is a reason for discarding lines when Loki accepts // unordered ingest (parameter `-ingester.unordered-writes` is set to // `true`, which is the default) and the lines in question are older than