Skip to content

Commit

Permalink
fix promtail tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ashwanthgoli committed Nov 7, 2023
1 parent ad32ed4 commit 940bcf7
Show file tree
Hide file tree
Showing 11 changed files with 9 additions and 171 deletions.
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/windows/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func init() {
// Enable debug logging
cfg := &server.Config{}
_ = cfg.LogLevel.Set("debug")
util_log.InitLogger(cfg, nil, true, false)
util_log.InitLogger(cfg, nil, false)
}

// Test that you can use to generate event logs locally.
Expand Down
102 changes: 0 additions & 102 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,105 +571,3 @@ func buildChunks(t testing.TB, size int) []Chunk {
}
return chks
}

func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) {
for _, waitForCheckpoint := range []bool{false, true} {
t.Run(fmt.Sprintf("checkpoint-%v", waitForCheckpoint), func(t *testing.T) {
walDir := t.TempDir()

ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)

// First launch the ingester with unordered writes enabled
dft := defaultLimitsTestConfig()
dft.UnorderedWrites = true
limits, err := validation.NewOverrides(dft, nil)
require.NoError(t, err)

newStore := func() *mockStore {
return &mockStore{
chunks: map[string][]chunk.Chunk{},
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger())
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

req := logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
{
Labels: `{foo="bar",bar="baz2"}`,
},
},
}

start := time.Now()
steps := 10
end := start.Add(time.Second * time.Duration(steps))

// Write data out of order
for i := steps - 1; i >= 0; i-- {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: start.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", i),
})
req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{
Timestamp: start.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", i),
})
}

ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, &req)
require.NoError(t, err)

if waitForCheckpoint {
// Ensure we have checkpointed now
expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*10) // give a bit of buffer

// Add some more data after the checkpoint
tmp := end
end = end.Add(time.Second * time.Duration(steps))
req.Streams[0].Entries = nil
req.Streams[1].Entries = nil
// Write data out of order again
for i := steps - 1; i >= 0; i-- {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: tmp.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", steps+i),
})
req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{
Timestamp: tmp.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", steps+i),
})
}

_, err = i.Push(ctx, &req)
require.NoError(t, err)
}

ensureIngesterData(ctx, t, start, end, i)

require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// Now disable unordered writes
limitCfg := defaultLimitsTestConfig()
limitCfg.UnorderedWrites = false
limits, err = validation.NewOverrides(limitCfg, nil)
require.NoError(t, err)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))

// ensure we've recovered data from wal segments
ensureIngesterData(ctx, t, start, end, i)
})
}
}
4 changes: 2 additions & 2 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *wal.Recor
return nil, fmt.Errorf("failed to create stream: %w", err)
}

s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures)
s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.streamRateCalculator, i.metrics, i.writeFailures)

// record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them).
if record != nil {
Expand Down Expand Up @@ -335,7 +335,7 @@ func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) (*st
return nil, fmt.Errorf("failed to create stream for fingerprint: %w", err)
}

s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures)
s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.streamRateCalculator, i.metrics, i.writeFailures)

i.streamsCreatedTotal.Inc()
memoryStreams.WithLabelValues(i.instanceID).Inc()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) {
require.NoError(t, err)
chunkfmt, headfmt, err := instance.chunkFormatAt(minTs(&testStream))
require.NoError(t, err)
chunk := newStream(chunkfmt, headfmt, cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil).NewChunk()
chunk := newStream(chunkfmt, headfmt, cfg, limiter, "fake", 0, nil, NewStreamRateCalculator(), NilMetrics, nil).NewChunk()
for _, entry := range testStream.Entries {
err = chunk.Append(&entry)
require.NoError(t, err)
Expand Down Expand Up @@ -556,7 +556,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {

b.Run("addTailersToNewStream", func(b *testing.B) {
for n := 0; n < b.N; n++ {
inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil))
inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter, "fake", 0, lbs, NewStreamRateCalculator(), NilMetrics, nil))
}
})
}
Expand Down
11 changes: 0 additions & 11 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type RingCount interface {
}

type Limits interface {
UnorderedWrites(userID string) bool
MaxLocalStreamsPerUser(userID string) int
MaxGlobalStreamsPerUser(userID string) int
PerStreamRateLimit(userID string) validation.RateLimit
Expand Down Expand Up @@ -66,16 +65,6 @@ func NewLimiter(limits Limits, metrics *ingesterMetrics, ring RingCount, replica
}
}

func (l *Limiter) UnorderedWrites(userID string) bool {
// WAL replay should not discard previously ack'd writes,
// so allow out of order writes while the limiter is disabled.
// This allows replaying unordered WALs into ordered configurations.
if l.disabled {
return true
}
return l.limits.UnorderedWrites(userID)
}

// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current
// number of streams in input and returns an error if so.
func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {
Expand Down
20 changes: 0 additions & 20 deletions pkg/ingester/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,26 +202,6 @@ func (r *ingesterRecoverer) Close() {
return nil
}

// If we've replayed a WAL with unordered writes, but the new
// configuration disables them, convert all streams/head blocks
// to ensure unordered writes are disabled after the replay,
// but without dropping any previously accepted data.
isAllowed := r.ing.limiter.UnorderedWrites(s.tenant)
old := s.unorderedWrites
s.unorderedWrites = isAllowed

if !isAllowed && old {
err := s.chunks[len(s.chunks)-1].chunk.ConvertHead(headBlockType(s.chunkFormat, isAllowed))
if err != nil {
level.Warn(r.logger).Log(
"msg", "error converting headblock",
"err", err.Error(),
"stream", s.labels.String(),
"component", "ingesterRecoverer",
)
}
}

return nil
})
}
Expand Down
13 changes: 3 additions & 10 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func newStream(
tenant string,
fp model.Fingerprint,
labels labels.Labels,
unorderedWrites bool,
streamRateCalculator *StreamRateCalculator,
metrics *ingesterMetrics,
writeFailures *writefailures.Manager,
Expand All @@ -120,8 +119,6 @@ func newStream(
metrics: metrics,
tenant: tenant,
streamRateCalculator: streamRateCalculator,

unorderedWrites: unorderedWrites,
writeFailures: writeFailures,
chunkFormat: chunkFormat,
chunkHeadBlockFormat: headBlockFmt,
Expand Down Expand Up @@ -395,7 +392,7 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWh

// The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age.
cutoff := highestTs.Add(-s.cfg.MaxChunkAge / 2)
if !isReplay && s.unorderedWrites && !highestTs.IsZero() && cutoff.After(entries[i].Timestamp) {
if !isReplay && !highestTs.IsZero() && cutoff.After(entries[i].Timestamp) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(cutoff)})
s.writeFailures.Log(s.tenant, fmt.Errorf("%w for stream %s", failedEntriesWithError[len(failedEntriesWithError)-1].e, s.labels))
outOfOrderSamples++
Expand Down Expand Up @@ -435,12 +432,8 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWh

func (s *stream) reportMetrics(outOfOrderSamples, outOfOrderBytes, rateLimitedSamples, rateLimitedBytes int) {
if outOfOrderSamples > 0 {
name := validation.OutOfOrder
if s.unorderedWrites {
name = validation.TooFarBehind
}
validation.DiscardedSamples.WithLabelValues(name, s.tenant).Add(float64(outOfOrderSamples))
validation.DiscardedBytes.WithLabelValues(name, s.tenant).Add(float64(outOfOrderBytes))
validation.DiscardedSamples.WithLabelValues(validation.TooFarBehind, s.tenant).Add(float64(outOfOrderSamples))
validation.DiscardedBytes.WithLabelValues(validation.TooFarBehind, s.tenant).Add(float64(outOfOrderBytes))
}
if rateLimitedSamples > 0 {
validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedSamples))
Expand Down
10 changes: 1 addition & 9 deletions pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
labels.Labels{
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
Expand Down Expand Up @@ -116,7 +115,6 @@ func TestPushDeduplication(t *testing.T) {
labels.Labels{
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
Expand Down Expand Up @@ -151,7 +149,6 @@ func TestPushRejectOldCounter(t *testing.T) {
labels.Labels{
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
Expand Down Expand Up @@ -257,7 +254,6 @@ func TestEntryErrorCorrectlyReported(t *testing.T) {
labels.Labels{
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
Expand Down Expand Up @@ -292,7 +288,6 @@ func TestUnorderedPush(t *testing.T) {
labels.Labels{
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
Expand Down Expand Up @@ -394,7 +389,6 @@ func TestPushRateLimit(t *testing.T) {
labels.Labels{
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
Expand Down Expand Up @@ -432,7 +426,6 @@ func TestPushRateLimitAllOrNothing(t *testing.T) {
labels.Labels{
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
Expand Down Expand Up @@ -469,7 +462,6 @@ func TestReplayAppendIgnoresValidityWindow(t *testing.T) {
labels.Labels{
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
Expand Down Expand Up @@ -523,7 +515,7 @@ func Benchmark_PushStream(b *testing.B) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
chunkfmt, headfmt := defaultChunkFormat(b)

s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil)
s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, NewStreamRateCalculator(), NilMetrics, nil)
t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{}, 10)
require.NoError(b, err)

Expand Down
2 changes: 0 additions & 2 deletions pkg/ingester/streams_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func TestStreamsMap(t *testing.T) {
labels.Labels{
{Name: "foo", Value: "bar"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
Expand All @@ -42,7 +41,6 @@ func TestStreamsMap(t *testing.T) {
labels.Labels{
{Name: "bar", Value: "foo"},
},
true,
NewStreamRateCalculator(),
NilMetrics,
nil,
Expand Down
7 changes: 0 additions & 7 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,6 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxLocalStreamsPerUser, "ingester.max-streams-per-user", 0, "Maximum number of active streams per user, per ingester. 0 to disable.")
f.IntVar(&l.MaxGlobalStreamsPerUser, "ingester.max-global-streams-per-user", 5000, "Maximum number of active streams per user, across the cluster. 0 to disable. When the global limit is enabled, each ingester is configured with a dynamic local limit based on the replication factor and the current number of healthy ingesters, and is kept updated whenever the number of ingesters change.")

// TODO(ashwanth) Deprecated. This will be removed with the next major release and out-of-order writes would be accepted by default.
f.BoolVar(&l.UnorderedWrites, "ingester.unordered-writes", true, "Deprecated. When true, out-of-order writes are accepted.")

_ = l.PerStreamRateLimit.Set(strconv.Itoa(defaultPerStreamRateLimit))
f.Var(&l.PerStreamRateLimit, "ingester.per-stream-rate-limit", "Maximum byte rate per second per stream, also expressible in human readable forms (1MB, 256KB, etc).")
_ = l.PerStreamRateLimitBurst.Set(strconv.Itoa(defaultPerStreamBurstLimit))
Expand Down Expand Up @@ -710,10 +707,6 @@ func (o *Overrides) StreamRetention(userID string) []StreamRetention {
return o.getOverridesForUser(userID).StreamRetention
}

func (o *Overrides) UnorderedWrites(userID string) bool {
return o.getOverridesForUser(userID).UnorderedWrites
}

func (o *Overrides) DeletionMode(userID string) string {
return o.getOverridesForUser(userID).DeletionMode
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ const (
// StreamRateLimit is a reason for discarding lines when the streams own rate limit is hit
// rather than the overall ingestion rate limit.
StreamRateLimit = "per_stream_rate_limit"
// OutOfOrder is a reason for discarding lines when Loki doesn't accept out
// of order log lines (parameter `-ingester.unordered-writes` is set to
// `false`) and the lines in question are older than the newest line in the
// stream.
OutOfOrder = "out_of_order"
// TooFarBehind is a reason for discarding lines when Loki accepts
// unordered ingest (parameter `-ingester.unordered-writes` is set to
// `true`, which is the default) and the lines in question are older than
Expand Down

0 comments on commit 940bcf7

Please sign in to comment.