From 3e74b802d24719a31f718433d9d25b5f04f94b0f Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Wed, 6 Nov 2024 14:39:18 -0700 Subject: [PATCH] wip --- cmd/loki/loki-local-config.yaml | 7 +++ pkg/chunkenc/memchunk.go | 1 + pkg/chunkenc/variants.go | 23 +++++++-- pkg/ingester/flush_test.go | 2 +- pkg/ingester/ingester.go | 2 +- pkg/ingester/ingester_test.go | 2 +- pkg/logql/engine.go | 10 +++- pkg/logql/evaluator.go | 85 ++++++++++++--------------------- pkg/logql/syntax/visit.go | 15 +++++- pkg/querier/ingester_querier.go | 16 +++++++ pkg/querier/querier.go | 43 ++++++++--------- pkg/storage/batch.go | 62 ++++++++++++++++-------- pkg/storage/batch_test.go | 3 +- pkg/storage/lazy_chunk.go | 2 +- pkg/storage/store.go | 16 ++++++- 15 files changed, 179 insertions(+), 110 deletions(-) diff --git a/cmd/loki/loki-local-config.yaml b/cmd/loki/loki-local-config.yaml index c593b14a252c0..9c35f7bd4157c 100644 --- a/cmd/loki/loki-local-config.yaml +++ b/cmd/loki/loki-local-config.yaml @@ -25,6 +25,13 @@ query_range: enabled: true max_size_mb: 100 +# querier: +# query_ingesters_within: 10m + +ingester: + max_chunk_age: 5m + # query_store_max_look_back_period: 10m + schema_config: configs: - from: 2020-10-24 diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 498073e116459..294a4abfae00c 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -1064,6 +1064,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi return iter.NewSortEntryIterator(blockItrs, direction), nil } +// TODO(twhitney): do I need a multi extractor iterator here as well? // Iterator implements Chunk. func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator { mint, maxt := from.UnixNano(), through.UnixNano() diff --git a/pkg/chunkenc/variants.go b/pkg/chunkenc/variants.go index f0d46239a8dea..f27c2ba6bf855 100644 --- a/pkg/chunkenc/variants.go +++ b/pkg/chunkenc/variants.go @@ -34,7 +34,7 @@ type multiExtractorSampleBufferedIterator struct { } func (e *multiExtractorSampleBufferedIterator) Next() bool { - if len(e.cur) > 0 { + if len(e.cur) > 1 { e.cur = e.cur[1:] e.currLabels = e.currLabels[1:] e.currBaseLabels = e.currBaseLabels[1:] @@ -42,6 +42,13 @@ func (e *multiExtractorSampleBufferedIterator) Next() bool { return true } + if len(e.cur) == 1 { + e.cur = e.cur[:0] + e.currLabels = e.currLabels[:0] + e.currBaseLabels = e.currBaseLabels[:0] + } + + // TDOO(twhitney): this loops never stops for e.bufferedIterator.Next() { for i, extractor := range e.extractors { val, lbls, ok := extractor.Process(e.currTs, e.currLine, e.currStructuredMetadata...) @@ -62,17 +69,23 @@ func (e *multiExtractorSampleBufferedIterator) Next() bool { builder.Add(log.ParsedLabel, lbls.Parsed()...) e.currLabels = append(e.currLabels, builder.LabelsResult()) - //TODO: is it enough to add __variant__ to result labels? Do the base labels need it to? + // TODO: is it enough to add __variant__ to result labels? Do the base labels need it too? e.currBaseLabels = append(e.currBaseLabels, extractor.BaseLabels()) e.cur = append(e.cur, logproto.Sample{ Value: val, Hash: xxhash.Sum64(e.currLine), Timestamp: e.currTs, }) + } - return true + // catch the case where no extractors were ok + if len(e.cur) <= 1 { + continue } + + return true } + return false } @@ -109,4 +122,6 @@ func (e *multiExtractorSampleBufferedIterator) Labels() string { return e.currLa func (e *multiExtractorSampleBufferedIterator) StreamHash() uint64 { return e.currBaseLabels[0].Hash() } -func (e *multiExtractorSampleBufferedIterator) At() logproto.Sample { return e.cur[0] } +func (e *multiExtractorSampleBufferedIterator) At() logproto.Sample { + return e.cur[0] +} diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 69cb04ce0d5bd..90407ec1a90b2 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -381,7 +381,7 @@ type testStore struct { } func (t *testStore) SelectVariants(ctx context.Context, req logql.SelectVariantsParams) (iter.SampleIterator, error) { - panic("TODO(twhitney): SelectVariants not implemented on testStore") // TODO: Implement + panic("TODO(twhitney): SelectVariants not implemented on testStore") // TODO: Implement } // Note: the ingester New() function creates it's own WAL first which we then override if specified. diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index dfdb3ac043b26..e58aa9e5570ef 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1654,5 +1654,5 @@ func (i *Ingester) getDetectedLabels(ctx context.Context, req *logproto.Detected // QuerySample the ingesters for series from logs matching a set of matchers. func (i *Ingester) QueryVariants(req *logproto.VariantsQueryRequest, queryServer logproto.Querier_QueryVariantsServer) error { - panic("TODO(twhitney): QueryVariants not implemented on Ingester") + panic("TODO(twhitney): QueryVariants not implemented on Ingester") } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 93bee8df37241..7208df2255e39 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -436,7 +436,7 @@ type mockStore struct { } func (m *mockStore) SelectVariants(ctx context.Context, req logql.SelectVariantsParams) (iter.SampleIterator, error) { - panic("TODO(twhitney): SelectVariants not implemented on mockStore") // TODO: Implement + panic("TODO(twhitney): SelectVariants not implemented on mockStore") // TODO: Implement } func (s *mockStore) Put(ctx context.Context, chunks []chunk.Chunk) error { diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index c9dec0b3446be..675d5fc7a9060 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -659,6 +659,7 @@ func (q *query) evalVariant( } defer util.LogErrorWithContext(ctx, "closing VariantsExpr", stepEvaluator.Close) + //TODO: this never returns next, _, r := stepEvaluator.Next() if stepEvaluator.Error() != nil { return nil, stepEvaluator.Error() @@ -668,7 +669,14 @@ func (q *query) evalVariant( switch vec := r.(type) { //TODO(twhitney): need case for a log query case SampleVector: - return nil, fmt.Errorf("unsupported result type: %T", vec) + maxSeriesCapture := func(id string) int { return q.limits.MaxQuerySeries(ctx, id) } + maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture) + //TDOO: what is merge first last for? + mfl := false + // if rae, ok := expr.(*syntax.RangeAggregationExpr); ok && (rae.Operation == syntax.OpRangeTypeFirstWithTimestamp || rae.Operation == syntax.OpRangeTypeLastWithTimestamp) { + // mfl = true + // } + return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries, mfl) default: return nil, fmt.Errorf("unsupported result type: %T", r) } diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index cce07edd7edaf..284a480f6e32b 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -440,10 +440,25 @@ func (ev *DefaultEvaluator) NewVariantsStepEvaluator( StoreChunks: q.GetStoreChunks(), }, }) + // variant := e.Variants()[0] + // it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{ + // &logproto.SampleQueryRequest{ + // Start: q.Start().Add(-logRange.Interval).Add(-logRange.Offset), + // End: q.End().Add(-logRange.Offset).Add(time.Nanosecond), + // // intentionally send the vector for reducing labels. + // Selector: variant.String(), + // Shards: q.Shards(), + // Plan: &plan.QueryPlan{ + // AST: variant, + // }, + // StoreChunks: q.GetStoreChunks(), + // }, + // }) if err != nil { return nil, err } return ev.newVariantsEvaluator(ctx, iter.NewPeekingSampleIterator(it), e, q) + // return newRangeAggEvaluator(iter.NewPeekingSampleIterator(it), variant.(*syntax.RangeAggregationExpr), q, 0) default: return nil, EvaluatorUnsupportedType(e, ev) } @@ -748,6 +763,7 @@ func newRangeAggEvaluator( iter: iter, }, nil default: + // TODO(twhitney): this is the case we match iter, err := newRangeVectorIterator( it, expr, expr.Left.Interval.Nanoseconds(), @@ -1449,14 +1465,8 @@ func (ev *DefaultEvaluator) newVariantsEvaluator( variantEvaluators[i] = variantEvaluator } - logRange := expr.LogRange() return &VariantsEvaluator{ - selRange: logRange.Interval.Nanoseconds(), - step: q.Step().Nanoseconds(), - end: q.End().UnixNano(), - current: q.Start().UnixNano() - q.Step().Nanoseconds(), - offset: logRange.Offset.Nanoseconds(), - + current: q.Start().UnixNano() - q.Step().Nanoseconds(), variantEvaluators: variantEvaluators, }, nil } @@ -1574,7 +1584,7 @@ func (it *bufferedVariantsIteratorWrapper) Next() bool { // VariantsEvaluator is responsible for making sure the window is loaded from all // evaluators for all variants type VariantsEvaluator struct { - selRange, step, end, current, offset int64 + current int64 variantEvaluators []StepEvaluator currentSamples SampleVector @@ -1592,58 +1602,25 @@ func (it *VariantsEvaluator) Explain(_ Node) { } func (it *VariantsEvaluator) Next() (bool, int64, StepResult) { - if !it.loadNextWindow() { - return false, it.current, SampleVector{} - } - - return true, it.current, it.currentSamples -} - -func (it *VariantsEvaluator) loadNextWindow() bool { - it.current += it.step - if it.current > it.end { - return false - } - - rangeStart := it.current - it.selRange - rangeEnd := it.current - - // store samples for each variant samples := it.currentSamples[:0] - for _, variantEval := range it.variantEvaluators { - samples = append(samples, it.loadSamplesForRange(variantEval, rangeStart, rangeEnd)...) - } - - it.currentSamples = samples - return true -} - -func (it *VariantsEvaluator) loadSamplesForRange( - variantEval StepEvaluator, - start, end int64, -) []promql.Sample { - var samples []promql.Sample - - // convert to milliseconds, as thats what the evalutors Next() will return - start = start / 1e+6 - end = end / 1e+6 - - // Next() (ok bool, ts int64, r StepResult) - for ok, ts, result := variantEval.Next(); ok; { - // upper bound is inclusive for step evaluation - if ts > end { - break - } + hasNext := false - // the lower bound of the range is not inclusive - if ts > start { - for _, sample := range result.SampleVector() { - samples = append(samples, sample) + for _, variantEval := range it.variantEvaluators { + if ok, ts, result := variantEval.Next(); ok { + hasNext = true + samples = append(samples, result.SampleVector()...) + if ts > it.current { + it.current = ts } } } - return samples + if !hasNext { + return false, 0, SampleVector{} + } + + it.currentSamples = samples + return true, it.current, it.currentSamples } func (it *VariantsEvaluator) Close() error { diff --git a/pkg/logql/syntax/visit.go b/pkg/logql/syntax/visit.go index 3e4321fb876d6..060e336f0e400 100644 --- a/pkg/logql/syntax/visit.go +++ b/pkg/logql/syntax/visit.go @@ -70,10 +70,21 @@ type DepthFirstTraversal struct { VisitRangeAggregationFn func(v RootVisitor, e *RangeAggregationExpr) VisitVectorFn func(v RootVisitor, e *VectorExpr) VisitVectorAggregationFn func(v RootVisitor, e *VectorAggregationExpr) + VisiVectorAggregationFn func(v RootVisitor, e *VectorAggregationExpr) + VisitVariantsFn func(v RootVisitor, e *MultiVariantExpr) } -func (d *DepthFirstTraversal) VisitVariants(_ *MultiVariantExpr) { - panic("not implemented") // TODO: Implement +// TODO: this is what's getting triggered +func (v *DepthFirstTraversal) VisitVariants(e *MultiVariantExpr) { + if e == nil { + return + } + + if v.VisitVariantsFn != nil { + v.VisitVariantsFn(v, e) + } else { + e.LogRange().Left.Accept(v) + } } // VisitBinOp implements RootVisitor. diff --git a/pkg/querier/ingester_querier.go b/pkg/querier/ingester_querier.go index cc076be1faefd..deaf7b0d2233a 100644 --- a/pkg/querier/ingester_querier.go +++ b/pkg/querier/ingester_querier.go @@ -447,6 +447,22 @@ func (q *IngesterQuerier) DetectedLabel(ctx context.Context, req *logproto.Detec return &logproto.LabelToValuesResponse{Labels: mergedResult}, nil } +func (q *IngesterQuerier) SelectVariants(ctx context.Context, req logql.SelectVariantsParams) ([]iter.SampleIterator, error) { + resps, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) { + return client.QueryVariants(ctx, req.VariantsQueryRequest) + }) + + if err != nil { + return nil, err + } + + iterators := make([]iter.SampleIterator, len(resps)) + // for i := range resps { + // iterators[i] = iter.NewSampleQueryClientIterator(resps[i].response.(logproto.Querier_QuerySampleClient)) + // } + return iterators, nil +} + func convertMatchersToString(matchers []*labels.Matcher) string { out := strings.Builder{} out.WriteRune('{') diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 039b50b161150..961537f4c27d3 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -311,6 +311,8 @@ func (q *SingleTenantQuerier) calculateIngesterMaxLookbackPeriod() time.Duration } func (q *SingleTenantQuerier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval, *interval) { + queryStart, queryEnd = queryStart.In(time.UTC), queryEnd.In(time.UTC) + // limitQueryInterval is a flag for whether store queries should be limited to start time of ingester queries. limitQueryInterval := false // ingesterMLB having -1 means query ingester for whole duration. @@ -1453,31 +1455,28 @@ func (q *SingleTenantQuerier) SelectVariants( level.Error(spanlogger.FromContext(ctx)).Log("msg", "failed loading deletes for user", "err", err) } - _, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End) + ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End) iters := []iter.SampleIterator{} // TODO(twhitney): deal with ingesters later - // if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil { - // // Make a copy of the request before modifying - // // because the initial request is used below to query stores - // queryRequestCopy := *params.VariantsQueryRequest - // newParams := logql.SelectVariantsParams{ - // VariantsQueryRequest: &queryRequestCopy, - // } - // newParams.Start = ingesterQueryInterval.start - // newParams.End = ingesterQueryInterval.end - - // ingesterIters, err := q.ingesterQuerier.SelectVariants(ctx, newParams) - // if err != nil { - // return nil, err - // } - - // for _, iter := range ingesterIters { - // for iter.Next() { - // iters = append(iters, iter.At()) - // } - // } - // } + if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil { + // Make a copy of the request before modifying + // because the initial request is used below to query stores + // TOTO(twhitney): implement iterators + // queryRequestCopy := *params.VariantsQueryRequest + // newParams := logql.SelectVariantsParams{ + // VariantsQueryRequest: &queryRequestCopy, + // } + // newParams.Start = ingesterQueryInterval.start + // newParams.End = ingesterQueryInterval.end + + // ingesterIters, err := q.ingesterQuerier.SelectVariants(ctx, newParams) + // if err != nil { + // return nil, err + // } + + // iters = append(iters, ingesterIters...) + } if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil { params.Start = storeQueryInterval.start diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index 2bdcfe02d3bea..f04ccf91289c4 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -464,9 +464,9 @@ type sampleBatchIterator struct { curr iter.SampleIterator err error - ctx context.Context - cancel context.CancelFunc - extractor syntax.SampleExtractor + ctx context.Context + cancel context.CancelFunc + extractors []syntax.SampleExtractor } func newSampleBatchIterator( @@ -476,16 +476,27 @@ func newSampleBatchIterator( chunks []*LazyChunk, batchSize int, matchers []*labels.Matcher, - extractor syntax.SampleExtractor, + extractors []syntax.SampleExtractor, start, end time.Time, chunkFilterer chunk.Filterer, ) (iter.SampleIterator, error) { ctx, cancel := context.WithCancel(ctx) return &sampleBatchIterator{ - extractor: extractor, - ctx: ctx, - cancel: cancel, - batchChunkIterator: newBatchChunkIterator(ctx, schemas, chunks, batchSize, logproto.FORWARD, start, end, metrics, matchers, chunkFilterer), + extractors: extractors, + ctx: ctx, + cancel: cancel, + batchChunkIterator: newBatchChunkIterator( + ctx, + schemas, + chunks, + batchSize, + logproto.FORWARD, + start, + end, + metrics, + matchers, + chunkFilterer, + ), }, nil } @@ -564,8 +575,18 @@ func (it *sampleBatchIterator) buildIterators(chks map[model.Fingerprint][][]*La result := make([]iter.SampleIterator, 0, len(chks)) for _, chunks := range chks { if len(chunks) != 0 && len(chunks[0]) != 0 { - streamExtractor := it.extractor.ForStream(labels.NewBuilder(chunks[0][0].Chunk.Metric).Del(labels.MetricName).Labels()) - iterator, err := it.buildHeapIterator(chunks, from, through, streamExtractor, nextChunk) + extractors := make([]log.StreamSampleExtractor, 0, len(it.extractors)) + for _, extractor := range it.extractors { + extractors = append( + extractors, + extractor.ForStream( + labels.NewBuilder(chunks[0][0].Chunk.Metric). + Del(labels.MetricName). + Labels(), + ), + ) + } + iterator, err := it.buildHeapIterator(chunks, from, through, extractors, nextChunk) if err != nil { return nil, err } @@ -576,7 +597,7 @@ func (it *sampleBatchIterator) buildIterators(chks map[model.Fingerprint][][]*La return result, nil } -func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, streamExtractor log.StreamSampleExtractor, nextChunk *LazyChunk) (iter.SampleIterator, error) { +func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, streamExtractors []log.StreamSampleExtractor, nextChunk *LazyChunk) (iter.SampleIterator, error) { result := make([]iter.SampleIterator, 0, len(chks)) for i := range chks { @@ -589,7 +610,7 @@ func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, thro it.ctx, from, through, - []log.StreamSampleExtractor{streamExtractor}, + streamExtractors, nextChunk, ) if err != nil { @@ -623,9 +644,10 @@ func newMultiExtractorSampleBatchIterator( ctx, cancel := context.WithCancel(ctx) return &multiExtractorSampleIterator{ sampleBatchIterator: &sampleBatchIterator{ - extractor: nil, - ctx: ctx, - cancel: cancel, + extractors: nil, + ctx: ctx, + cancel: cancel, + batchChunkIterator: newBatchChunkIterator( ctx, schemas, @@ -702,13 +724,13 @@ func (it *multiExtractorSampleIterator) buildIterators( Labels(), ), ) + } - iterator, err := it.buildHeapIterator(chunks, from, through, extractors, nextChunk) - if err != nil { - return nil, err - } - result = append(result, iterator) + iterator, err := it.buildHeapIterator(chunks, from, through, extractors, nextChunk) + if err != nil { + return nil, err } + result = append(result, iterator) } } diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 115e625a28af7..e03cfcd4f3a8c 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/storage/config" ) @@ -1418,7 +1419,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false) require.NoError(t, err) - it, err := newSampleBatchIterator(context.Background(), s, NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), ex, tt.start, tt.end, nil) + it, err := newSampleBatchIterator(context.Background(), s, NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), []syntax.SampleExtractor{ex}, tt.start, tt.end, nil) require.NoError(t, err) series, _, err := iter.ReadSampleBatch(it, 1000) _ = it.Close() diff --git a/pkg/storage/lazy_chunk.go b/pkg/storage/lazy_chunk.go index c65ee22328df3..aa6ab2012003e 100644 --- a/pkg/storage/lazy_chunk.go +++ b/pkg/storage/lazy_chunk.go @@ -141,7 +141,7 @@ func (c *LazyChunk) SampleIterator( var blockSampleIterator iter.SampleIterator if len(extractors) == 0 { return nil, errors.New("no extractors provided") - } else if len(extractors) > 0 { + } else if len(extractors) > 1 { blockSampleIterator = b.MultiExtractorSampleIterator(ctx, extractors) } else { blockSampleIterator = b.SampleIterator(ctx, extractors[0]) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index ed633044e56ea..abd32dc21f8fe 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -25,6 +25,7 @@ import ( "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" + "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/querier/astmapper" "github.com/grafana/loki/v3/pkg/storage/chunk" @@ -164,7 +165,7 @@ func (s *LokiStore) SelectVariants( chunkFilterer = s.chunkFilterer.ForRequest(ctx) } - return newMultiExtractorSampleBatchIterator( + return newSampleBatchIterator( ctx, s.schemaCfg, s.chunkMetrics, @@ -651,7 +652,18 @@ func (s *LokiStore) SelectSamples(ctx context.Context, req logql.SelectSamplePar chunkFilterer = s.chunkFilterer.ForRequest(ctx) } - return newSampleBatchIterator(ctx, s.schemaCfg, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, extractor, req.Start, req.End, chunkFilterer) + return newSampleBatchIterator( + ctx, + s.schemaCfg, + s.chunkMetrics, + lazyChunks, + s.cfg.MaxChunkBatchSize, + matchers, + []syntax.SampleExtractor{extractor}, + req.Start, + req.End, + chunkFilterer, + ) } func (s *LokiStore) GetSchemaConfigs() []config.PeriodConfig {