Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Nov 21, 2024
1 parent 4f99f11 commit 3e74b80
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 110 deletions.
7 changes: 7 additions & 0 deletions cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ query_range:
enabled: true
max_size_mb: 100

# querier:
# query_ingesters_within: 10m

ingester:
max_chunk_age: 5m
# query_store_max_look_back_period: 10m

schema_config:
configs:
- from: 2020-10-24
Expand Down
1 change: 1 addition & 0 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
return iter.NewSortEntryIterator(blockItrs, direction), nil
}

// TODO(twhitney): do I need a multi extractor iterator here as well?
// Iterator implements Chunk.
func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator {
mint, maxt := from.UnixNano(), through.UnixNano()
Expand Down
23 changes: 19 additions & 4 deletions pkg/chunkenc/variants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,21 @@ type multiExtractorSampleBufferedIterator struct {
}

func (e *multiExtractorSampleBufferedIterator) Next() bool {
if len(e.cur) > 0 {
if len(e.cur) > 1 {
e.cur = e.cur[1:]
e.currLabels = e.currLabels[1:]
e.currBaseLabels = e.currBaseLabels[1:]

return true
}

if len(e.cur) == 1 {
e.cur = e.cur[:0]
e.currLabels = e.currLabels[:0]
e.currBaseLabels = e.currBaseLabels[:0]
}

// TDOO(twhitney): this loops never stops
for e.bufferedIterator.Next() {
for i, extractor := range e.extractors {
val, lbls, ok := extractor.Process(e.currTs, e.currLine, e.currStructuredMetadata...)
Expand All @@ -62,17 +69,23 @@ func (e *multiExtractorSampleBufferedIterator) Next() bool {
builder.Add(log.ParsedLabel, lbls.Parsed()...)
e.currLabels = append(e.currLabels, builder.LabelsResult())

//TODO: is it enough to add __variant__ to result labels? Do the base labels need it to?
// TODO: is it enough to add __variant__ to result labels? Do the base labels need it too?
e.currBaseLabels = append(e.currBaseLabels, extractor.BaseLabels())
e.cur = append(e.cur, logproto.Sample{
Value: val,
Hash: xxhash.Sum64(e.currLine),
Timestamp: e.currTs,
})
}

return true
// catch the case where no extractors were ok
if len(e.cur) <= 1 {
continue
}

return true
}

return false
}

Expand Down Expand Up @@ -109,4 +122,6 @@ func (e *multiExtractorSampleBufferedIterator) Labels() string { return e.currLa

func (e *multiExtractorSampleBufferedIterator) StreamHash() uint64 { return e.currBaseLabels[0].Hash() }

func (e *multiExtractorSampleBufferedIterator) At() logproto.Sample { return e.cur[0] }
func (e *multiExtractorSampleBufferedIterator) At() logproto.Sample {
return e.cur[0]
}
2 changes: 1 addition & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ type testStore struct {
}

func (t *testStore) SelectVariants(ctx context.Context, req logql.SelectVariantsParams) (iter.SampleIterator, error) {
panic("TODO(twhitney): SelectVariants not implemented on testStore") // TODO: Implement
panic("TODO(twhitney): SelectVariants not implemented on testStore") // TODO: Implement
}

// Note: the ingester New() function creates it's own WAL first which we then override if specified.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1654,5 +1654,5 @@ func (i *Ingester) getDetectedLabels(ctx context.Context, req *logproto.Detected

// QuerySample the ingesters for series from logs matching a set of matchers.
func (i *Ingester) QueryVariants(req *logproto.VariantsQueryRequest, queryServer logproto.Querier_QueryVariantsServer) error {
panic("TODO(twhitney): QueryVariants not implemented on Ingester")
panic("TODO(twhitney): QueryVariants not implemented on Ingester")
}
2 changes: 1 addition & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ type mockStore struct {
}

func (m *mockStore) SelectVariants(ctx context.Context, req logql.SelectVariantsParams) (iter.SampleIterator, error) {
panic("TODO(twhitney): SelectVariants not implemented on mockStore") // TODO: Implement
panic("TODO(twhitney): SelectVariants not implemented on mockStore") // TODO: Implement
}

func (s *mockStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
Expand Down
10 changes: 9 additions & 1 deletion pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ func (q *query) evalVariant(
}
defer util.LogErrorWithContext(ctx, "closing VariantsExpr", stepEvaluator.Close)

//TODO: this never returns
next, _, r := stepEvaluator.Next()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
Expand All @@ -668,7 +669,14 @@ func (q *query) evalVariant(
switch vec := r.(type) {
//TODO(twhitney): need case for a log query
case SampleVector:
return nil, fmt.Errorf("unsupported result type: %T", vec)
maxSeriesCapture := func(id string) int { return q.limits.MaxQuerySeries(ctx, id) }
maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture)
//TDOO: what is merge first last for?
mfl := false
// if rae, ok := expr.(*syntax.RangeAggregationExpr); ok && (rae.Operation == syntax.OpRangeTypeFirstWithTimestamp || rae.Operation == syntax.OpRangeTypeLastWithTimestamp) {
// mfl = true
// }
return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries, mfl)
default:
return nil, fmt.Errorf("unsupported result type: %T", r)
}
Expand Down
85 changes: 31 additions & 54 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,25 @@ func (ev *DefaultEvaluator) NewVariantsStepEvaluator(
StoreChunks: q.GetStoreChunks(),
},
})
// variant := e.Variants()[0]
// it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
// &logproto.SampleQueryRequest{
// Start: q.Start().Add(-logRange.Interval).Add(-logRange.Offset),
// End: q.End().Add(-logRange.Offset).Add(time.Nanosecond),
// // intentionally send the vector for reducing labels.
// Selector: variant.String(),
// Shards: q.Shards(),
// Plan: &plan.QueryPlan{
// AST: variant,
// },
// StoreChunks: q.GetStoreChunks(),
// },
// })
if err != nil {
return nil, err
}
return ev.newVariantsEvaluator(ctx, iter.NewPeekingSampleIterator(it), e, q)
// return newRangeAggEvaluator(iter.NewPeekingSampleIterator(it), variant.(*syntax.RangeAggregationExpr), q, 0)
default:
return nil, EvaluatorUnsupportedType(e, ev)
}
Expand Down Expand Up @@ -748,6 +763,7 @@ func newRangeAggEvaluator(
iter: iter,
}, nil
default:
// TODO(twhitney): this is the case we match
iter, err := newRangeVectorIterator(
it, expr,
expr.Left.Interval.Nanoseconds(),
Expand Down Expand Up @@ -1449,14 +1465,8 @@ func (ev *DefaultEvaluator) newVariantsEvaluator(
variantEvaluators[i] = variantEvaluator
}

logRange := expr.LogRange()
return &VariantsEvaluator{
selRange: logRange.Interval.Nanoseconds(),
step: q.Step().Nanoseconds(),
end: q.End().UnixNano(),
current: q.Start().UnixNano() - q.Step().Nanoseconds(),
offset: logRange.Offset.Nanoseconds(),

current: q.Start().UnixNano() - q.Step().Nanoseconds(),
variantEvaluators: variantEvaluators,
}, nil
}
Expand Down Expand Up @@ -1574,7 +1584,7 @@ func (it *bufferedVariantsIteratorWrapper) Next() bool {
// VariantsEvaluator is responsible for making sure the window is loaded from all
// evaluators for all variants
type VariantsEvaluator struct {
selRange, step, end, current, offset int64
current int64

variantEvaluators []StepEvaluator
currentSamples SampleVector
Expand All @@ -1592,58 +1602,25 @@ func (it *VariantsEvaluator) Explain(_ Node) {
}

func (it *VariantsEvaluator) Next() (bool, int64, StepResult) {
if !it.loadNextWindow() {
return false, it.current, SampleVector{}
}

return true, it.current, it.currentSamples
}

func (it *VariantsEvaluator) loadNextWindow() bool {
it.current += it.step
if it.current > it.end {
return false
}

rangeStart := it.current - it.selRange
rangeEnd := it.current

// store samples for each variant
samples := it.currentSamples[:0]
for _, variantEval := range it.variantEvaluators {
samples = append(samples, it.loadSamplesForRange(variantEval, rangeStart, rangeEnd)...)
}

it.currentSamples = samples
return true
}

func (it *VariantsEvaluator) loadSamplesForRange(
variantEval StepEvaluator,
start, end int64,
) []promql.Sample {
var samples []promql.Sample

// convert to milliseconds, as thats what the evalutors Next() will return
start = start / 1e+6
end = end / 1e+6

// Next() (ok bool, ts int64, r StepResult)
for ok, ts, result := variantEval.Next(); ok; {
// upper bound is inclusive for step evaluation
if ts > end {
break
}
hasNext := false

// the lower bound of the range is not inclusive
if ts > start {
for _, sample := range result.SampleVector() {
samples = append(samples, sample)
for _, variantEval := range it.variantEvaluators {
if ok, ts, result := variantEval.Next(); ok {
hasNext = true
samples = append(samples, result.SampleVector()...)
if ts > it.current {
it.current = ts
}
}
}

return samples
if !hasNext {
return false, 0, SampleVector{}
}

it.currentSamples = samples
return true, it.current, it.currentSamples
}

func (it *VariantsEvaluator) Close() error {
Expand Down
15 changes: 13 additions & 2 deletions pkg/logql/syntax/visit.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,21 @@ type DepthFirstTraversal struct {
VisitRangeAggregationFn func(v RootVisitor, e *RangeAggregationExpr)
VisitVectorFn func(v RootVisitor, e *VectorExpr)
VisitVectorAggregationFn func(v RootVisitor, e *VectorAggregationExpr)
VisiVectorAggregationFn func(v RootVisitor, e *VectorAggregationExpr)
VisitVariantsFn func(v RootVisitor, e *MultiVariantExpr)
}

func (d *DepthFirstTraversal) VisitVariants(_ *MultiVariantExpr) {
panic("not implemented") // TODO: Implement
// TODO: this is what's getting triggered
func (v *DepthFirstTraversal) VisitVariants(e *MultiVariantExpr) {
if e == nil {
return
}

if v.VisitVariantsFn != nil {
v.VisitVariantsFn(v, e)
} else {
e.LogRange().Left.Accept(v)
}
}

// VisitBinOp implements RootVisitor.
Expand Down
16 changes: 16 additions & 0 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,22 @@ func (q *IngesterQuerier) DetectedLabel(ctx context.Context, req *logproto.Detec
return &logproto.LabelToValuesResponse{Labels: mergedResult}, nil
}

func (q *IngesterQuerier) SelectVariants(ctx context.Context, req logql.SelectVariantsParams) ([]iter.SampleIterator, error) {
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.QueryVariants(ctx, req.VariantsQueryRequest)
})

if err != nil {
return nil, err
}

iterators := make([]iter.SampleIterator, len(resps))
// for i := range resps {
// iterators[i] = iter.NewSampleQueryClientIterator(resps[i].response.(logproto.Querier_QuerySampleClient))
// }
return iterators, nil
}

func convertMatchersToString(matchers []*labels.Matcher) string {
out := strings.Builder{}
out.WriteRune('{')
Expand Down
43 changes: 21 additions & 22 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ func (q *SingleTenantQuerier) calculateIngesterMaxLookbackPeriod() time.Duration
}

func (q *SingleTenantQuerier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval, *interval) {
queryStart, queryEnd = queryStart.In(time.UTC), queryEnd.In(time.UTC)

// limitQueryInterval is a flag for whether store queries should be limited to start time of ingester queries.
limitQueryInterval := false
// ingesterMLB having -1 means query ingester for whole duration.
Expand Down Expand Up @@ -1453,31 +1455,28 @@ func (q *SingleTenantQuerier) SelectVariants(
level.Error(spanlogger.FromContext(ctx)).Log("msg", "failed loading deletes for user", "err", err)
}

_, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End)
ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End)

iters := []iter.SampleIterator{}
// TODO(twhitney): deal with ingesters later
// if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil {
// // Make a copy of the request before modifying
// // because the initial request is used below to query stores
// queryRequestCopy := *params.VariantsQueryRequest
// newParams := logql.SelectVariantsParams{
// VariantsQueryRequest: &queryRequestCopy,
// }
// newParams.Start = ingesterQueryInterval.start
// newParams.End = ingesterQueryInterval.end

// ingesterIters, err := q.ingesterQuerier.SelectVariants(ctx, newParams)
// if err != nil {
// return nil, err
// }

// for _, iter := range ingesterIters {
// for iter.Next() {
// iters = append(iters, iter.At())
// }
// }
// }
if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil {
// Make a copy of the request before modifying
// because the initial request is used below to query stores
// TOTO(twhitney): implement iterators
// queryRequestCopy := *params.VariantsQueryRequest
// newParams := logql.SelectVariantsParams{
// VariantsQueryRequest: &queryRequestCopy,
// }
// newParams.Start = ingesterQueryInterval.start
// newParams.End = ingesterQueryInterval.end

// ingesterIters, err := q.ingesterQuerier.SelectVariants(ctx, newParams)
// if err != nil {
// return nil, err
// }

// iters = append(iters, ingesterIters...)
}

if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil {
params.Start = storeQueryInterval.start
Expand Down
Loading

0 comments on commit 3e74b80

Please sign in to comment.