Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add variants() to LogQL #14578

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ query_range:
enabled: true
max_size_mb: 100

# querier:
# query_ingesters_within: 10m

ingester:
max_chunk_age: 5m
# query_store_max_look_back_period: 10m

schema_config:
configs:
- from: 2020-10-24
Expand All @@ -38,7 +45,6 @@ schema_config:
pattern_ingester:
enabled: true
metric_aggregation:
enabled: true
loki_address: localhost:3100

ruler:
Expand Down
2 changes: 2 additions & 0 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,6 @@ type Block interface {
Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator
// SampleIterator returns a sample iterator for the block.
SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator
// MultiExtractorSampleIterator returns a sample iterator for the block that can do multiple extractions per line.
MultiExtractorSampleIterator(ctx context.Context, extractors []log.StreamSampleExtractor) iter.SampleIterator
}
8 changes: 8 additions & 0 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
return iter.NewSortEntryIterator(blockItrs, direction), nil
}

// TODO(twhitney): do I need a multi extractor iterator here as well?
// Iterator implements Chunk.
func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator {
mint, maxt := from.UnixNano(), through.UnixNano()
Expand Down Expand Up @@ -1193,6 +1194,13 @@ func (b encBlock) SampleIterator(ctx context.Context, extractor log.StreamSample
return newSampleIterator(ctx, compression.GetReaderPool(b.enc), b.b, b.format, extractor, b.symbolizer)
}

func (b encBlock) MultiExtractorSampleIterator(ctx context.Context, extractors []log.StreamSampleExtractor) iter.SampleIterator {
if len(b.b) == 0 {
return iter.NoopSampleIterator
}
return newMultiExtractorSampleIterator(ctx, compression.GetReaderPool(b.enc), b.b, b.format, extractors, b.symbolizer)
}

func (b block) Offset() int {
return b.offset
}
Expand Down
127 changes: 127 additions & 0 deletions pkg/chunkenc/variants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package chunkenc

import (
"context"
"sort"
"strconv"

"github.com/cespare/xxhash/v2"
"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/prometheus/prometheus/model/labels"
)

func newMultiExtractorSampleIterator(ctx context.Context, pool compression.ReaderPool, b []byte, format byte, extractors []log.StreamSampleExtractor, symbolizer *symbolizer) iter.SampleIterator {
return &multiExtractorSampleBufferedIterator{
bufferedIterator: newBufferedIterator(ctx, pool, b, format, symbolizer),
extractors: extractors,
stats: stats.FromContext(ctx),
}
}

type multiExtractorSampleBufferedIterator struct {
*bufferedIterator

extractors []log.StreamSampleExtractor
stats *stats.Context

cur []logproto.Sample
currLabels []log.LabelsResult
currBaseLabels []log.LabelsResult
}

func (e *multiExtractorSampleBufferedIterator) Next() bool {
if len(e.cur) > 1 {
e.cur = e.cur[1:]
e.currLabels = e.currLabels[1:]
e.currBaseLabels = e.currBaseLabels[1:]

return true
}

if len(e.cur) == 1 {
e.cur = e.cur[:0]
e.currLabels = e.currLabels[:0]
e.currBaseLabels = e.currBaseLabels[:0]
}

// TDOO(twhitney): this loops never stops
for e.bufferedIterator.Next() {
for i, extractor := range e.extractors {
val, lbls, ok := extractor.Process(e.currTs, e.currLine, e.currStructuredMetadata...)
if !ok {
continue
}

e.stats.AddPostFilterLines(1)

streamLbls := lbls.Stream()
streamLbls = append(streamLbls, labels.Label{
Name: "__variant__",
Value: strconv.FormatInt(int64(i), 10),
})

builder := log.NewBaseLabelsBuilder().ForLabels(streamLbls, streamLbls.Hash())
builder.Add(log.StructuredMetadataLabel, lbls.StructuredMetadata()...)
builder.Add(log.ParsedLabel, lbls.Parsed()...)
e.currLabels = append(e.currLabels, builder.LabelsResult())

// TODO: is it enough to add __variant__ to result labels? Do the base labels need it too?
e.currBaseLabels = append(e.currBaseLabels, extractor.BaseLabels())
e.cur = append(e.cur, logproto.Sample{
Value: val,
Hash: xxhash.Sum64(e.currLine),
Timestamp: e.currTs,
})
}

// catch the case where no extractors were ok
if len(e.cur) <= 1 {
continue
}

return true
}

return false
}

func flattenLabels(buf labels.Labels, many ...labels.Labels) labels.Labels {
var size int
for _, lbls := range many {
size += len(lbls)
}

if buf == nil || cap(buf) < size {
buf = make(labels.Labels, 0, size)
} else {
buf = buf[:0]
}

for _, lbls := range many {
buf = append(buf, lbls...)
}
sort.Sort(buf)
return buf
}

func (e *multiExtractorSampleBufferedIterator) Close() error {
for _, extractor := range e.extractors {
if extractor.ReferencedStructuredMetadata() {
e.stats.SetQueryReferencedStructuredMetadata()
}
}

return e.bufferedIterator.Close()
}

func (e *multiExtractorSampleBufferedIterator) Labels() string { return e.currLabels[0].String() }

func (e *multiExtractorSampleBufferedIterator) StreamHash() uint64 { return e.currBaseLabels[0].Hash() }

func (e *multiExtractorSampleBufferedIterator) At() logproto.Sample {
return e.cur[0]
}
4 changes: 4 additions & 0 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,10 @@ type testStore struct {
onPut func(ctx context.Context, chunks []chunk.Chunk) error
}

func (t *testStore) SelectVariants(ctx context.Context, req logql.SelectVariantsParams) (iter.SampleIterator, error) {
panic("TODO(twhitney): SelectVariants not implemented on testStore") // TODO: Implement
}

// Note: the ingester New() function creates it's own WAL first which we then override if specified.
// Because of this, ensure any WAL directories exist/are cleaned up even when overriding the wal.
// This is an ugly hook for testing :(
Expand Down
5 changes: 5 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1651,3 +1651,8 @@ func (i *Ingester) getDetectedLabels(ctx context.Context, req *logproto.Detected
}
return &logproto.LabelToValuesResponse{Labels: result}, nil
}

// QuerySample the ingesters for series from logs matching a set of matchers.
func (i *Ingester) QueryVariants(req *logproto.VariantsQueryRequest, queryServer logproto.Querier_QueryVariantsServer) error {
panic("TODO(twhitney): QueryVariants not implemented on Ingester")
}
4 changes: 4 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,10 @@ type mockStore struct {
chunks map[string][]chunk.Chunk
}

func (m *mockStore) SelectVariants(ctx context.Context, req logql.SelectVariantsParams) (iter.SampleIterator, error) {
panic("TODO(twhitney): SelectVariants not implemented on mockStore") // TODO: Implement
}

func (s *mockStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down
18 changes: 12 additions & 6 deletions pkg/iter/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ type StreamIterator[T logprotoType] interface {
StreamHash() uint64
}

type EntryIterator StreamIterator[logproto.Entry]
type SampleIterator StreamIterator[logproto.Sample]
type (
EntryIterator StreamIterator[logproto.Entry]
SampleIterator StreamIterator[logproto.Sample]
)

// noOpIterator implements StreamIterator
type noOpIterator[T logprotoType] struct{}
Expand All @@ -33,8 +35,10 @@ func (noOpIterator[T]) Labels() string { return "" }
func (noOpIterator[T]) StreamHash() uint64 { return 0 }
func (noOpIterator[T]) Close() error { return nil }

var NoopEntryIterator = noOpIterator[logproto.Entry]{}
var NoopSampleIterator = noOpIterator[logproto.Sample]{}
var (
NoopEntryIterator = noOpIterator[logproto.Entry]{}
NoopSampleIterator = noOpIterator[logproto.Sample]{}
)

// errorIterator implements StreamIterator
type errorIterator[T logprotoType] struct{}
Expand All @@ -46,5 +50,7 @@ func (errorIterator[T]) Labels() string { return "" }
func (errorIterator[T]) StreamHash() uint64 { return 0 }
func (errorIterator[T]) Close() error { return errors.New("close") }

var ErrorEntryIterator = errorIterator[logproto.Entry]{}
var ErrorSampleIterator = errorIterator[logproto.Sample]{}
var (
ErrorEntryIterator = errorIterator[logproto.Entry]{}
ErrorSampleIterator = errorIterator[logproto.Sample]{}
)
4 changes: 4 additions & 0 deletions pkg/logcli/client/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ type querier struct {
labels labels.Labels
}

func (q *querier) SelectVariants(_ context.Context, _ logql.SelectVariantsParams) (iter.SampleIterator, error) {
panic("TODO(twhitney): SelectVariants not implemented on logcli client.Querier") // TODO: Implement
}

func (q *querier) SelectLogs(_ context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) {
expr, err := params.LogSelector()
if err != nil {
Expand Down
Loading
Loading