From db71e7471872f16382a919e9802eee1556b9a20d Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Mon, 5 Aug 2024 16:27:40 +0100 Subject: [PATCH] clarity --- pkg/ingester-rf1/ingester.go | 1 - pkg/ingester-rf1/instance.go | 15 +++------------ 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/pkg/ingester-rf1/ingester.go b/pkg/ingester-rf1/ingester.go index ce0d229b1fbcf..8ee0d0e8928b3 100644 --- a/pkg/ingester-rf1/ingester.go +++ b/pkg/ingester-rf1/ingester.go @@ -497,7 +497,6 @@ func (i *Ingester) periodicStreamMaintenance() { func (i *Ingester) cleanIdleStreams() { for _, instance := range i.getInstances() { _ = instance.streams.ForEach(func(s *stream) (bool, error) { - instance.index.Delete(s.labels, s.fp) if time.Since(s.highestTs) > i.cfg.StreamRetainPeriod { instance.streams.Delete(s) } diff --git a/pkg/ingester-rf1/instance.go b/pkg/ingester-rf1/instance.go index 72f2f613a9090..e05e99ba8b2f6 100644 --- a/pkg/ingester-rf1/instance.go +++ b/pkg/ingester-rf1/instance.go @@ -18,7 +18,6 @@ import ( "github.com/grafana/loki/v3/pkg/analytics" "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/distributor/writefailures" - "github.com/grafana/loki/v3/pkg/ingester/index" "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" @@ -61,7 +60,6 @@ type instance struct { buf []byte // buffer used to compute fps. streams *streamsMap - index *index.Multi mapper *FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free instanceID string @@ -140,10 +138,6 @@ func newInstance( customStreamsTracker push.UsageTracker, logger log.Logger, ) (*instance, error) { - invertedIndex, err := index.NewMultiInvertedIndex(periodConfigs, uint32(cfg.IndexShards)) - if err != nil { - return nil, err - } streams := newStreamsMap() ownedStreamsSvc := newOwnedStreamService(instanceID, limiter) c := config.SchemaConfig{Configs: periodConfigs} @@ -151,7 +145,6 @@ func newInstance( cfg: cfg, streams: streams, buf: make([]byte, 0, 1024), - index: invertedIndex, instanceID: instanceID, // streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID), @@ -182,7 +175,7 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre // reducing the stream limits, for instance. var err error - labels, err := syntax.ParseLabels(pushReqStream.Labels) + sortedLabels, err := syntax.ParseLabels(pushReqStream.Labels) if err != nil { if i.configs.LogStreamCreation(i.instanceID) { level.Debug(util_log.Logger).Log( @@ -196,12 +189,10 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre } if err != nil { - return i.onStreamCreationError(ctx, pushReqStream, err, labels) + return i.onStreamCreationError(ctx, pushReqStream, err, sortedLabels) } - fp := i.getHashForLabels(labels) - - sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp) + fp := i.getHashForLabels(sortedLabels) chunkfmt, headfmt, err := i.chunkFormatAt(minTs(&pushReqStream)) if err != nil {