From eca6134bd3a87757439d094b1d9e6845da5f1e7f Mon Sep 17 00:00:00 2001 From: DylanGuedes Date: Thu, 19 Sep 2024 11:18:05 -0300 Subject: [PATCH 1/2] Report PSRL error message. --- pkg/ingester/stream.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index fe4a644c71109..8f20f45dca933 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -452,6 +452,10 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, failedEntriesWithError = make([]entryWithError, 0, len(toStore)) for i := 0; i < len(toStore); i++ { failedEntriesWithError = append(failedEntriesWithError, entryWithError{&toStore[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(len(toStore[i].Line))}}) + if i == 0 { + // only report PSRL write failure once. + s.writeFailures.Log(s.tenant, failedEntriesWithError[len(failedEntriesWithError)-1].e) + } rateLimitedBytes += len(toStore[i].Line) } } From 7e5dbfcfb68c98da4b0807f14c9fc09e189e6792 Mon Sep 17 00:00:00 2001 From: DylanGuedes Date: Thu, 19 Sep 2024 11:43:00 -0300 Subject: [PATCH 2/2] Refactor to get rid of conditional. --- pkg/ingester/stream.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 8f20f45dca933..d1577b1d2fffe 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -452,12 +452,11 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry, failedEntriesWithError = make([]entryWithError, 0, len(toStore)) for i := 0; i < len(toStore); i++ { failedEntriesWithError = append(failedEntriesWithError, entryWithError{&toStore[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(len(toStore[i].Line))}}) - if i == 0 { - // only report PSRL write failure once. - s.writeFailures.Log(s.tenant, failedEntriesWithError[len(failedEntriesWithError)-1].e) - } rateLimitedBytes += len(toStore[i].Line) } + + // Log the only last error to the write failures manager. + s.writeFailures.Log(s.tenant, failedEntriesWithError[len(failedEntriesWithError)-1].e) } s.streamRateCalculator.Record(s.tenant, s.labelHash, s.labelHashNoShard, totalBytes)