Skip to content

Commit

Permalink
Merge branch 'grafana:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
Gordejj authored Jan 29, 2024
2 parents 6ea09ac + c01a823 commit e51586a
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 64 deletions.
2 changes: 1 addition & 1 deletion docs/sources/setup/install/helm/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ false
<td>string</td>
<td></td>
<td><pre lang="json">
"v1.8.4"
"v1.8.6"
</pre>
</td>
</tr>
Expand Down
79 changes: 31 additions & 48 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,64 +84,47 @@ func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
}
}

// taskMergeIterator implements v1.Iterator
type taskMergeIterator struct {
curr v1.Request
heap *v1.HeapIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]]
tasks []Task
day model.Time
tokenizer *v1.NGramTokenizer
err error
func (t Task) RequestIter(tokenizer *v1.NGramTokenizer) v1.Iterator[v1.Request] {
return &requestIterator{
series: v1.NewSliceIter(t.series),
searches: convertToSearches(t.filters, tokenizer),
channel: t.ResCh,
curr: v1.Request{},
}
}

func newTaskMergeIterator(day model.Time, tokenizer *v1.NGramTokenizer, tasks ...Task) v1.PeekingIterator[v1.Request] {
it := &taskMergeIterator{
tasks: tasks,
curr: v1.Request{},
day: day,
tokenizer: tokenizer,
}
it.init()
return v1.NewPeekingIter[v1.Request](it)
var _ v1.Iterator[v1.Request] = &requestIterator{}

type requestIterator struct {
series v1.Iterator[*logproto.GroupedChunkRefs]
searches [][]byte
channel chan<- v1.Output
curr v1.Request
}

func (it *taskMergeIterator) init() {
sequences := make([]v1.PeekingIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks))
for i := range it.tasks {
iter := v1.NewSliceIterWithIndex(it.tasks[i].series, i)
sequences = append(sequences, iter)
}
it.heap = v1.NewHeapIterator(
func(i, j v1.IndexedValue[*logproto.GroupedChunkRefs]) bool {
return i.Value().Fingerprint < j.Value().Fingerprint
},
sequences...,
)
it.err = nil
// At implements v1.Iterator.
func (it *requestIterator) At() v1.Request {

return it.curr
}

// Err implements v1.Iterator.
func (it *requestIterator) Err() error {
return nil
}

func (it *taskMergeIterator) Next() bool {
ok := it.heap.Next()
// Next implements v1.Iterator.
func (it *requestIterator) Next() bool {
ok := it.series.Next()
if !ok {
return false
}

group := it.heap.At()
task := it.tasks[group.Index()]

group := it.series.At()
it.curr = v1.Request{
Fp: model.Fingerprint(group.Value().Fingerprint),
Chks: convertToChunkRefs(group.Value().Refs),
Searches: convertToSearches(task.filters, it.tokenizer),
Response: task.ResCh,
Fp: model.Fingerprint(group.Fingerprint),
Chks: convertToChunkRefs(group.Refs),
Searches: it.searches,
Response: it.channel,
}
return true
}

func (it *taskMergeIterator) At() v1.Request {
return it.curr
}

func (it *taskMergeIterator) Err() error {
return it.err
}
24 changes: 18 additions & 6 deletions pkg/bloomgateway/multiplexing_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package bloomgateway

import (
"math"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

Expand All @@ -32,7 +34,6 @@ func TestTask(t *testing.T) {
from, through := task.Bounds()
require.Equal(t, ts.Add(-1*time.Hour), from)
require.Equal(t, ts, through)
require.Equal(t, truncateDay(ts), task.day)
})
}

Expand All @@ -50,14 +51,18 @@ func createTasksForRequests(t *testing.T, tenant string, requests ...*logproto.F
return tasks
}

func TestTaskMergeIterator(t *testing.T) {
func TestTask_RequestIterator(t *testing.T) {
ts := mktime("2024-01-24 12:00")
day := truncateDay(ts)
tenant := "fake"
tokenizer := v1.NewNGramTokenizer(4, 0)

t.Run("empty requests result in empty iterator", func(t *testing.T) {
it := newTaskMergeIterator(day, tokenizer)
t.Run("empty request yields empty iterator", func(t *testing.T) {
swb := seriesWithBounds{
bounds: model.Interval{Start: 0, End: math.MaxInt64},
series: []*logproto.GroupedChunkRefs{},
}
task, _ := NewTask(tenant, swb, []syntax.LineFilter{})
it := task.RequestIter(tokenizer)
// nothing to iterate over
require.False(t, it.Next())
})
Expand Down Expand Up @@ -97,7 +102,14 @@ func TestTaskMergeIterator(t *testing.T) {
}

tasks := createTasksForRequests(t, tenant, r1, r2, r3)
it := newTaskMergeIterator(day, tokenizer, tasks...)

iters := make([]v1.PeekingIterator[v1.Request], 0, len(tasks))
for _, task := range tasks {
iters = append(iters, v1.NewPeekingIter(task.RequestIter(tokenizer)))
}

// merge the request iterators using the heap sort iterator
it := v1.NewHeapIterator[v1.Request](func(r1, r2 v1.Request) bool { return r1.Fp < r2.Fp }, iters...)

// first item
require.True(t, it.Next())
Expand Down
16 changes: 10 additions & 6 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (w *worker) running(ctx context.Context) error {
blockRefs = append(blockRefs, b.blockRef)
}

err = w.processBlocksWithCallback(taskCtx, tasks[0].Tenant, day, blockRefs, tasksForBlocks)
err = w.processBlocksWithCallback(taskCtx, tasks[0].Tenant, blockRefs, tasksForBlocks)
if err != nil {
for _, t := range tasks {
t.ErrCh <- err
Expand All @@ -215,26 +215,30 @@ func (w *worker) stopping(err error) error {
return nil
}

func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, day model.Time, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error {
func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error {
return w.shipper.Fetch(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error {
for _, b := range boundedRefs {
if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp {
return w.processBlock(bq, day, b.tasks)
return w.processBlock(bq, b.tasks)
}
}
return nil
})
}

func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day model.Time, tasks []Task) error {
func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, tasks []Task) error {
schema, err := blockQuerier.Schema()
if err != nil {
return err
}

tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), 0)
it := newTaskMergeIterator(day, tokenizer, tasks...)
fq := blockQuerier.Fuse([]v1.PeekingIterator[v1.Request]{it})
iters := make([]v1.PeekingIterator[v1.Request], 0, len(tasks))
for _, task := range tasks {
it := v1.NewPeekingIter(task.RequestIter(tokenizer))
iters = append(iters, it)
}
fq := blockQuerier.Fuse(iters)

start := time.Now()
err = fq.Run()
Expand Down
1 change: 1 addition & 0 deletions production/helm/loki/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Entries should include a reference to the pull request that introduced the chang

## 5.42.0

- [CHANGE] Changed versions of Loki v2.9.4 and GEL v1.8.6
- [ENHANCEMENT] Bumped "grafana-agent-operator" depenency chart version to it's latest version

## 5.41.8
Expand Down
2 changes: 1 addition & 1 deletion production/helm/loki/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: v2
name: loki
description: Helm chart for Grafana Loki in simple, scalable mode
type: application
appVersion: 2.9.3
appVersion: 2.9.4
version: 5.42.0
home: https://grafana.github.io/helm-charts
sources:
Expand Down
2 changes: 1 addition & 1 deletion production/helm/loki/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# loki

![Version: 5.42.0](https://img.shields.io/badge/Version-5.42.0-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.3](https://img.shields.io/badge/AppVersion-2.9.3-informational?style=flat-square)
![Version: 5.42.0](https://img.shields.io/badge/Version-5.42.0-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.4](https://img.shields.io/badge/AppVersion-2.9.4-informational?style=flat-square)

Helm chart for Grafana Loki in simple, scalable mode

Expand Down
2 changes: 1 addition & 1 deletion production/helm/loki/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ enterprise:
# Enable enterprise features, license must be provided
enabled: false
# Default verion of GEL to deploy
version: v1.8.4
version: v1.8.6
# -- Optional name of the GEL cluster, otherwise will use .Release.Name
# The cluster name must match what is in your GEL license
cluster_name: null
Expand Down

0 comments on commit e51586a

Please sign in to comment.