Skip to content

Commit

Permalink
clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive committed Aug 5, 2024
1 parent 9734141 commit db71e74
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 13 deletions.
1 change: 0 additions & 1 deletion pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,6 @@ func (i *Ingester) periodicStreamMaintenance() {
func (i *Ingester) cleanIdleStreams() {
for _, instance := range i.getInstances() {
_ = instance.streams.ForEach(func(s *stream) (bool, error) {
instance.index.Delete(s.labels, s.fp)
if time.Since(s.highestTs) > i.cfg.StreamRetainPeriod {
instance.streams.Delete(s)
}
Expand Down
15 changes: 3 additions & 12 deletions pkg/ingester-rf1/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/ingester/index"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
Expand Down Expand Up @@ -61,7 +60,6 @@ type instance struct {
buf []byte // buffer used to compute fps.
streams *streamsMap

index *index.Multi
mapper *FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free

instanceID string
Expand Down Expand Up @@ -140,18 +138,13 @@ func newInstance(
customStreamsTracker push.UsageTracker,
logger log.Logger,
) (*instance, error) {
invertedIndex, err := index.NewMultiInvertedIndex(periodConfigs, uint32(cfg.IndexShards))
if err != nil {
return nil, err
}
streams := newStreamsMap()
ownedStreamsSvc := newOwnedStreamService(instanceID, limiter)
c := config.SchemaConfig{Configs: periodConfigs}
i := &instance{
cfg: cfg,
streams: streams,
buf: make([]byte, 0, 1024),
index: invertedIndex,
instanceID: instanceID,
//
streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
Expand Down Expand Up @@ -182,7 +175,7 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre
// reducing the stream limits, for instance.
var err error

labels, err := syntax.ParseLabels(pushReqStream.Labels)
sortedLabels, err := syntax.ParseLabels(pushReqStream.Labels)
if err != nil {
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
Expand All @@ -196,12 +189,10 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre
}

if err != nil {
return i.onStreamCreationError(ctx, pushReqStream, err, labels)
return i.onStreamCreationError(ctx, pushReqStream, err, sortedLabels)
}

fp := i.getHashForLabels(labels)

sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp)
fp := i.getHashForLabels(sortedLabels)

chunkfmt, headfmt, err := i.chunkFormatAt(minTs(&pushReqStream))
if err != nil {
Expand Down

0 comments on commit db71e74

Please sign in to comment.