Skip to content

Commit

Permalink
Merge branch 'main' into operand-loki-v2.9.6
Browse files Browse the repository at this point in the history
  • Loading branch information
periklis authored Mar 27, 2024
2 parents a63f493 + ca667aa commit 30e3cfe
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 52 deletions.
24 changes: 11 additions & 13 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
Expand All @@ -61,7 +62,6 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
"github.com/grafana/loki/pkg/util/spanlogger"
)

var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring")
Expand Down Expand Up @@ -200,15 +200,13 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return nil, err
}

sp, ctx := spanlogger.NewWithLogger(
ctx,
log.With(g.logger, "tenant", tenantID),
"bloomgateway.FilterChunkRefs",
)
logger := log.With(g.logger, "tenant", tenantID)

sp, ctx := opentracing.StartSpanFromContext(ctx, "bloomgateway.FilterChunkRefs")
stats, ctx := ContextWithEmptyStats(ctx)
defer func() {
level.Info(sp).Log(stats.KVArgs()...)
level.Info(logger).Log(stats.KVArgs()...)
sp.LogKV(stats.KVArgs()...)
sp.Finish()
}()

Expand Down Expand Up @@ -249,7 +247,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}, nil
}

sp.Log(
sp.LogKV(
"filters", len(filters),
"days", len(seriesByDay),
"series_requested", len(req.Refs),
Expand Down Expand Up @@ -279,7 +277,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
for _, task := range tasks {
task := task
task.enqueueTime = time.Now()
level.Info(sp).Log("msg", "enqueue task", "task", task.ID, "table", task.table, "series", len(task.series))
level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "table", task.table, "series", len(task.series))

// TODO(owen-d): gracefully handle full queues
if err := g.queue.Enqueue(tenantID, nil, task, func() {
Expand All @@ -293,7 +291,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
go g.consumeTask(ctx, task, tasksCh)
}

sp.Log("msg", "enqueued tasks", "duration", time.Since(queueStart).String())
sp.LogKV("msg", "enqueued tasks", "duration", time.Since(queueStart).String())

remaining := len(tasks)

Expand All @@ -310,7 +308,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
stats.Status = "cancel"
return nil, errors.Wrap(ctx.Err(), "request failed")
case task := <-tasksCh:
level.Info(sp).Log("msg", "task done", "task", task.ID, "err", task.Err())
level.Info(logger).Log("msg", "task done", "task", task.ID, "err", task.Err())
if task.Err() != nil {
stats.Status = labelFailure
return nil, errors.Wrap(task.Err(), "request failed")
Expand All @@ -320,7 +318,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}
}

sp.Log("msg", "received all responses")
sp.LogKV("msg", "received all responses")

start := time.Now()
filtered := filterChunkRefs(req, responses)
Expand Down Expand Up @@ -348,7 +346,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
stats.ChunksRequested = preFilterChunks
stats.ChunksFiltered = preFilterChunks - postFilterChunks

sp.Log("msg", "return filtered chunk refs")
sp.LogKV("msg", "return filtered chunk refs")

return &logproto.FilterChunkRefResponse{ChunkRefs: filtered}, nil
}
Expand Down
35 changes: 18 additions & 17 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,19 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config.
return err
}

return p.processBlocks(ctx, bqs, data)
start = time.Now()
res := p.processBlocks(ctx, bqs, data)
duration = time.Since(start)

for _, t := range tasks {
FromContext(t.ctx).AddProcessingTime(duration)
}

return res
}

func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.CloseableBlockQuerier, data []blockWithTasks) error {

defer func() {
for i := range bqs {
if bqs[i] == nil {
Expand Down Expand Up @@ -162,22 +171,12 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie
tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), 0)
iters := make([]v1.PeekingIterator[v1.Request], 0, len(tasks))

// collect spans & run single defer to avoid blowing call stack
// if there are many tasks
spans := make([]opentracing.Span, 0, len(tasks))
defer func() {
for _, sp := range spans {
sp.Finish()
}
}()

for _, task := range tasks {
// add spans for each task context for this block
sp, _ := opentracing.StartSpanFromContext(task.ctx, "bloomgateway.ProcessBlock")
spans = append(spans, sp)
md, _ := blockQuerier.Metadata()
blk := bloomshipper.BlockRefFrom(task.Tenant, task.table.String(), md)
sp.LogKV("block", blk.String())
if sp := opentracing.SpanFromContext(task.ctx); sp != nil {
md, _ := blockQuerier.Metadata()
blk := bloomshipper.BlockRefFrom(task.Tenant, task.table.String(), md)
sp.LogKV("process block", blk.String())
}

it := v1.NewPeekingIter(task.RequestIter(tokenizer))
iters = append(iters, it)
Expand All @@ -196,7 +195,9 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie
}

for _, task := range tasks {
FromContext(task.ctx).AddProcessingTime(duration)
stats := FromContext(task.ctx)
stats.AddTotalProcessingTime(duration)
stats.IncProcessedBlocks()
}

return err
Expand Down
4 changes: 4 additions & 0 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -88,6 +89,9 @@ func (s *dummyStore) FetchBlocks(_ context.Context, refs []bloomshipper.BlockRef

func TestProcessor(t *testing.T) {
ctx := context.Background()
sp, ctx := opentracing.StartSpanFromContext(ctx, "TestProcessor")
t.Cleanup(sp.Finish)

tenant := "fake"
now := mktime("2024-01-27 12:00")
metrics := newWorkerMetrics(prometheus.NewPedanticRegistry(), constants.Loki, "bloom_gatway")
Expand Down
40 changes: 35 additions & 5 deletions pkg/bloomgateway/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@ import (
)

type Stats struct {
Status string
NumTasks, NumFilters int
ChunksRequested, ChunksFiltered, SeriesRequested, SeriesFiltered int
QueueTime, MetasFetchTime, BlocksFetchTime, ProcessingTime, PostProcessingTime atomic.Duration
Status string
NumTasks, NumFilters int
ChunksRequested, ChunksFiltered int
SeriesRequested, SeriesFiltered int
QueueTime *atomic.Duration
MetasFetchTime, BlocksFetchTime *atomic.Duration
ProcessingTime, TotalProcessingTime *atomic.Duration
PostProcessingTime *atomic.Duration
ProcessedBlocks *atomic.Int32
}

type statsKey int
Expand All @@ -20,7 +25,16 @@ var ctxKey = statsKey(0)

// ContextWithEmptyStats returns a context with empty stats.
func ContextWithEmptyStats(ctx context.Context) (*Stats, context.Context) {
stats := &Stats{Status: "unknown"}
stats := &Stats{
Status: "unknown",
ProcessedBlocks: atomic.NewInt32(0),
QueueTime: atomic.NewDuration(0),
MetasFetchTime: atomic.NewDuration(0),
BlocksFetchTime: atomic.NewDuration(0),
ProcessingTime: atomic.NewDuration(0),
TotalProcessingTime: atomic.NewDuration(0),
PostProcessingTime: atomic.NewDuration(0),
}
ctx = context.WithValue(ctx, ctxKey, stats)
return stats, ctx
}
Expand Down Expand Up @@ -56,6 +70,8 @@ func (s *Stats) KVArgs() []any {
"msg", "stats-report",
"status", s.Status,
"tasks", s.NumTasks,
"filters", s.NumFilters,
"blocks_processed", s.ProcessedBlocks.Load(),
"series_requested", s.SeriesRequested,
"series_filtered", s.SeriesFiltered,
"chunks_requested", s.ChunksRequested,
Expand Down Expand Up @@ -99,9 +115,23 @@ func (s *Stats) AddProcessingTime(t time.Duration) {
s.ProcessingTime.Add(t)
}

func (s *Stats) AddTotalProcessingTime(t time.Duration) {
if s == nil {
return
}
s.TotalProcessingTime.Add(t)
}

func (s *Stats) AddPostProcessingTime(t time.Duration) {
if s == nil {
return
}
s.PostProcessingTime.Add(t)
}

func (s *Stats) IncProcessedBlocks() {
if s == nil {
return
}
s.ProcessedBlocks.Inc()
}
25 changes: 8 additions & 17 deletions pkg/storage/stores/shipper/indexshipper/indexgateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -355,8 +356,8 @@ func (g *Gateway) GetVolume(ctx context.Context, req *logproto.VolumeRequest) (*

func (g *Gateway) GetShards(request *logproto.ShardsRequest, server logproto.IndexGateway_GetShardsServer) error {
ctx := server.Context()
log, _ := spanlogger.New(context.Background(), "IndexGateway.GetShards")
defer log.Finish()
sp, ctx := opentracing.StartSpanFromContext(ctx, "indexgateway.GetShards")
defer sp.Finish()

instanceID, err := tenant.TenantID(ctx)
if err != nil {
Expand Down Expand Up @@ -412,11 +413,8 @@ func (g *Gateway) getShardsWithBlooms(
// as getting it _very_ wrong could harm some cache locality benefits on the bloom-gws by
// sending multiple requests to the entire keyspace).

sp, ctx := spanlogger.NewWithLogger(
ctx,
log.With(g.log, "tenant", instanceID),
"indexgateway.getShardsWithBlooms",
)
logger := log.With(g.log, "tenant", instanceID)
sp, ctx := opentracing.StartSpanFromContext(ctx, "indexgateway.getShardsWithBlooms")
defer sp.Finish()

// 1) for all bounds, get chunk refs
Expand Down Expand Up @@ -473,17 +471,10 @@ func (g *Gateway) getShardsWithBlooms(
resp.Shards = shards
}

level.Debug(g.log).Log(
"msg", "shards response",
"total_chunks", statistics.Index.TotalChunks,
"post_filter_chunks", statistics.Index.PostFilterChunks,
"shards", len(resp.Shards),
"query", req.Query,
"target_bytes_per_shard", datasize.ByteSize(req.TargetBytesPerShard).HumanReadable(),
)
sp.LogKV("msg", "send shards response", "shards", len(resp.Shards))

level.Debug(sp).Log(
"msg", "shards response",
level.Debug(logger).Log(
"msg", "send shards response",
"total_chunks", statistics.Index.TotalChunks,
"post_filter_chunks", statistics.Index.PostFilterChunks,
"shards", len(resp.Shards),
Expand Down

0 comments on commit 30e3cfe

Please sign in to comment.