diff --git a/CHANGELOG.md b/CHANGELOG.md index f9d2d38fbbfed..0e723e64176c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,8 @@ ##### Enhancements -* [11571](https://github.com/grafana/loki/pull/11571) **MichelHollands**: Add a metrics.go log line for requests from querier to ingester +* [11633](https://github.com/grafana/loki/pull/11633) **cyriltovena**: Add profiling integrations to tracing instrumentation. +* [11571](https://github.com/grafana/loki/pull/11571) **MichelHollands**: Add a metrics.go log line for requests from querier to ingester * [11477](https://github.com/grafana/loki/pull/11477) **MichelHollands**: support GET for /ingester/shutdown * [11363](https://github.com/grafana/loki/pull/11363) **kavirajk**: bugfix(memcached): Make memcached batch fetch truely context aware. * [11319](https://github.com/grafana/loki/pull/11319) **someStrangerFromTheAbyss**: Helm: Add extraContainers to the write pods. @@ -57,6 +58,7 @@ * [11601](https://github.com/grafana/loki/pull/11601) **dannykopping** Ruler: Fixed a panic that can be caused by concurrent read-write access of tenant configs when there are a large amount of rules. * [11606](https://github.com/grafana/loki/pull/11606) **dannykopping** Fixed regression adding newlines to HTTP error response bodies which may break client integrations. * [11657](https://github.com/grafana/loki/pull/11657) **ashwanthgoli** Log results cache: compose empty response based on the request being served to avoid returning incorrect limit or direction. +* [11587](https://github.com/grafana/loki/pull/11587) **trevorwhitney** Fix semantics of label parsing logic of metrics and logs queries. Both only parse the first label if multiple extractions into the same label are requested. ##### Changes diff --git a/cmd/loki/main.go b/cmd/loki/main.go index 845104eee8de5..937a5c16fab80 100644 --- a/cmd/loki/main.go +++ b/cmd/loki/main.go @@ -10,7 +10,9 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/log" + "github.com/grafana/dskit/spanprofiler" "github.com/grafana/dskit/tracing" + "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/version" @@ -84,7 +86,9 @@ func main() { if err != nil { level.Error(util_log.Logger).Log("msg", "error in initializing tracing. tracing will not be enabled", "err", err) } - + if config.Tracing.ProfilingEnabled { + opentracing.SetGlobalTracer(spanprofiler.NewTracer(opentracing.GlobalTracer())) + } defer func() { if trace != nil { if err := trace.Close(); err != nil { diff --git a/integration/client/client.go b/integration/client/client.go index dcf2c036dc9e9..2e5a86aa6b3de 100644 --- a/integration/client/client.go +++ b/integration/client/client.go @@ -479,12 +479,21 @@ type Header struct { Name, Value string } -// RunRangeQuery runs a query and returns an error if anything went wrong +// RunRangeQuery runs a 7d query and returns an error if anything went wrong +// This function is kept to keep backwards copatibility of existing tests. +// Better use (*Client).RunRangeQueryWithStartEnd() func (c *Client) RunRangeQuery(ctx context.Context, query string, extraHeaders ...Header) (*Response, error) { + end := c.Now.Add(time.Second) + start := c.Now.Add(-7 * 24 * time.Hour) + return c.RunRangeQueryWithStartEnd(ctx, query, start, end, extraHeaders...) +} + +// RunRangeQuery runs a query and returns an error if anything went wrong +func (c *Client) RunRangeQueryWithStartEnd(ctx context.Context, query string, start, end time.Time, extraHeaders ...Header) (*Response, error) { ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout) defer cancelFunc() - buf, statusCode, err := c.run(ctx, c.rangeQueryURL(query), extraHeaders...) + buf, statusCode, err := c.run(ctx, c.rangeQueryURL(query, start, end), extraHeaders...) if err != nil { return nil, err } @@ -555,11 +564,11 @@ func (c *Client) parseResponse(buf []byte, statusCode int) (*Response, error) { return &lokiResp, nil } -func (c *Client) rangeQueryURL(query string) string { +func (c *Client) rangeQueryURL(query string, start, end time.Time) string { v := url.Values{} v.Set("query", query) - v.Set("start", formatTS(c.Now.Add(-7*24*time.Hour))) - v.Set("end", formatTS(c.Now.Add(time.Second))) + v.Set("start", formatTS(start)) + v.Set("end", formatTS(end)) u, err := url.Parse(c.baseURL) if err != nil { diff --git a/integration/cluster/cluster.go b/integration/cluster/cluster.go index 8ddeac00f1782..831da46f2cb99 100644 --- a/integration/cluster/cluster.go +++ b/integration/cluster/cluster.go @@ -43,7 +43,6 @@ server: grpc_server_max_recv_msg_size: 110485813 grpc_server_max_send_msg_size: 110485813 - common: path_prefix: {{.dataPath}} storage: @@ -70,14 +69,25 @@ storage_config: store-1: directory: {{.sharedDataPath}}/fs-store-1 boltdb_shipper: - active_index_directory: {{.dataPath}}/index + active_index_directory: {{.dataPath}}/boltdb-index cache_location: {{.dataPath}}/boltdb-cache tsdb_shipper: active_index_directory: {{.dataPath}}/tsdb-index cache_location: {{.dataPath}}/tsdb-cache + bloom_shipper: + working_directory: {{.dataPath}}/bloom-shipper + blocks_downloading_queue: + workers_count: 1 + +bloom_gateway: + enabled: false + +bloom_compactor: + enabled: false + working_directory: {{.dataPath}}/bloom-compactor compactor: - working_directory: {{.dataPath}}/retention + working_directory: {{.dataPath}}/compactor retention_enabled: true delete_request_store: store-1 @@ -154,14 +164,14 @@ func New(logLevel level.Value, opts ...func(*Cluster)) *Cluster { } resetMetricRegistry() - sharedPath, err := os.MkdirTemp("", "loki-shared-data") + sharedPath, err := os.MkdirTemp("", "loki-shared-data-") if err != nil { panic(err.Error()) } overridesFile := filepath.Join(sharedPath, "loki-overrides.yaml") - err = os.WriteFile(filepath.Join(sharedPath, "loki-overrides.yaml"), []byte(`overrides:`), 0777) + err = os.WriteFile(overridesFile, []byte(`overrides:`), 0777) if err != nil { panic(fmt.Errorf("error creating overrides file: %w", err)) } @@ -318,12 +328,12 @@ func port(addr string) string { func (c *Component) writeConfig() error { var err error - configFile, err := os.CreateTemp("", "loki-config") + configFile, err := os.CreateTemp("", fmt.Sprintf("loki-%s-config-*.yaml", c.name)) if err != nil { return fmt.Errorf("error creating config file: %w", err) } - c.dataPath, err = os.MkdirTemp("", "loki-data") + c.dataPath, err = os.MkdirTemp("", fmt.Sprintf("loki-%s-data-", c.name)) if err != nil { return fmt.Errorf("error creating data path: %w", err) } @@ -408,6 +418,8 @@ func (c *Component) run() error { c.configFile, "-limits.per-user-override-config", c.overridesFile, + "-limits.per-user-override-period", + "1s", ), flagset); err != nil { return err } diff --git a/integration/loki_micro_services_test.go b/integration/loki_micro_services_test.go index a4d03ed10a673..1f7dc836b5ff6 100644 --- a/integration/loki_micro_services_test.go +++ b/integration/loki_micro_services_test.go @@ -3,11 +3,14 @@ package integration import ( "context" "encoding/json" + "fmt" + "math/rand" "strings" "sync" "testing" "time" + "github.com/go-kit/log/level" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "github.com/prometheus/prometheus/model/labels" @@ -1056,6 +1059,174 @@ func TestCategorizedLabels(t *testing.T) { } } +func TestBloomFiltersEndToEnd(t *testing.T) { + commonFlags := []string{ + "-bloom-compactor.compaction-interval=2s", + "-bloom-compactor.enable-compaction=true", + "-bloom-compactor.enabled=true", + "-bloom-gateway.enable-filtering=true", + "-bloom-gateway.enabled=true", + "-compactor.compaction-interval=1s", + "-frontend.default-validity=0s", + "-ingester.flush-on-shutdown=true", + "-ingester.wal-enabled=false", + "-query-scheduler.use-scheduler-ring=false", + "-store.index-cache-read.embedded-cache.enabled=true", + } + + tenantID := randStringRunes() + + clu := cluster.New( + level.DebugValue(), + cluster.SchemaWithTSDB, + func(c *cluster.Cluster) { c.SetSchemaVer("v13") }, + ) + + defer func() { + assert.NoError(t, clu.Cleanup()) + }() + + var ( + tDistributor = clu.AddComponent( + "distributor", + append( + commonFlags, + "-target=distributor", + )..., + ) + tIndexGateway = clu.AddComponent( + "index-gateway", + append( + commonFlags, + "-target=index-gateway", + )..., + ) + _ = clu.AddComponent( + "bloom-gateway", + append( + commonFlags, + "-target=bloom-gateway", + )..., + ) + ) + require.NoError(t, clu.Run()) + + var ( + tIngester = clu.AddComponent( + "ingester", + append( + commonFlags, + "-target=ingester", + "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + )..., + ) + tQueryScheduler = clu.AddComponent( + "query-scheduler", + append( + commonFlags, + "-target=query-scheduler", + "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + )..., + ) + tCompactor = clu.AddComponent( + "compactor", + append( + commonFlags, + "-target=compactor", + "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + )..., + ) + _ = clu.AddComponent( + "bloom-compactor", + append( + commonFlags, + "-target=bloom-compactor", + "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + )..., + ) + ) + require.NoError(t, clu.Run()) + + // finally, run the query-frontend and querier. + var ( + tQueryFrontend = clu.AddComponent( + "query-frontend", + append( + commonFlags, + "-target=query-frontend", + "-frontend.scheduler-address="+tQueryScheduler.GRPCURL(), + "-common.compactor-address="+tCompactor.HTTPURL(), + "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + )..., + ) + _ = clu.AddComponent( + "querier", + append( + commonFlags, + "-target=querier", + "-querier.scheduler-address="+tQueryScheduler.GRPCURL(), + "-common.compactor-address="+tCompactor.HTTPURL(), + "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + )..., + ) + ) + require.NoError(t, clu.Run()) + + now := time.Now() + + cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL()) + cliDistributor.Now = now + + cliIngester := client.New(tenantID, "", tIngester.HTTPURL()) + cliIngester.Now = now + + cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL()) + cliQueryFrontend.Now = now + + cliIndexGateway := client.New(tenantID, "", tIndexGateway.HTTPURL()) + cliIndexGateway.Now = now + + lineTpl := `caller=loki_micro_services_test.go msg="push log line" id="%s"` + // ingest logs from 10 different pods + // each line contains a random, unique string + // that string is used to verify filtering using bloom gateway + uniqueStrings := make([]string, 600) + for i := 0; i < len(uniqueStrings); i++ { + id := randStringRunes() + id = fmt.Sprintf("%s-%d", id, i) + uniqueStrings[i] = id + pod := fmt.Sprintf("pod-%d", i%10) + line := fmt.Sprintf(lineTpl, id) + err := cliDistributor.PushLogLine(line, now.Add(-1*time.Hour).Add(time.Duration(i-len(uniqueStrings))*time.Second), nil, map[string]string{"pod": pod}) + require.NoError(t, err) + } + + // restart ingester to flush chunks and that there are zero chunks in memory + require.NoError(t, cliIngester.Flush()) + require.NoError(t, tIngester.Restart()) + + // wait for compactor to compact index and for bloom compactor to build bloom filters + time.Sleep(10 * time.Second) + + // use bloom gateway to perform needle in the haystack queries + randIdx := rand.Intn(len(uniqueStrings)) + q := fmt.Sprintf(`{job="varlog"} |= "%s"`, uniqueStrings[randIdx]) + end := now.Add(-1 * time.Second) + start := end.Add(-24 * time.Hour) + resp, err := cliQueryFrontend.RunRangeQueryWithStartEnd(context.Background(), q, start, end) + require.NoError(t, err) + + // verify response + require.Len(t, resp.Data.Stream, 1) + expectedLine := fmt.Sprintf(lineTpl, uniqueStrings[randIdx]) + require.Equal(t, expectedLine, resp.Data.Stream[0].Values[0][1]) + + // TODO(chaudum): + // verify that bloom blocks have actually been used for querying + // atm, we can only verify by logs, so we should add appropriate metrics for + // uploaded/downloaded blocks and metas +} + func getValueFromMF(mf *dto.MetricFamily, lbs []*dto.LabelPair) float64 { for _, m := range mf.Metric { if !assert.ObjectsAreEqualValues(lbs, m.GetLabel()) { diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 7f999c0ebfad6..a5f1185f57e84 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -51,6 +51,7 @@ import ( "github.com/grafana/loki/pkg/storage" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" chunk_client "github.com/grafana/loki/pkg/storage/chunk/client" + "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper" @@ -166,10 +167,18 @@ func New( return nil, errors.Wrap(err, "create index shipper") } + // The ObjectClient does not expose the key encoder it uses, + // so check the concrete type and set the FSEncoder if needed. + var keyEncoder chunk_client.KeyEncoder + switch objectClient.(type) { + case *local.FSObjectClient: + keyEncoder = chunk_client.FSEncoder + } + c.storeClients[periodicConfig.From] = storeClient{ object: objectClient, index: index_storage.NewIndexStorageClient(objectClient, periodicConfig.IndexTables.PathPrefix), - chunk: chunk_client.NewClient(objectClient, nil, schemaConfig), + chunk: chunk_client.NewClient(objectClient, keyEncoder, schemaConfig), indexShipper: indexShipper, } } @@ -275,7 +284,7 @@ func (c *Compactor) compactTable(ctx context.Context, logger log.Logger, tableNa return fmt.Errorf("index store client not found for period starting at %s", schemaCfg.From.String()) } - _, tenants, err := sc.index.ListFiles(ctx, tableName, false) + _, tenants, err := sc.index.ListFiles(ctx, tableName, true) if err != nil { return fmt.Errorf("failed to list files for table %s: %w", tableName, err) } diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 403378e016a9f..b0c3251a0843d 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -180,9 +180,8 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, o sharding: shardingStrategy, pendingTasks: makePendingTasks(pendingTasksInitialCap), workerConfig: workerConfig{ - maxWaitTime: 200 * time.Millisecond, - maxItems: 100, - processBlocksSequentially: false, + maxWaitTime: 200 * time.Millisecond, + maxItems: 100, }, workerMetrics: newWorkerMetrics(reg, constants.Loki, metricsSubsystem), queueMetrics: queue.NewMetrics(reg, constants.Loki, metricsSubsystem), @@ -323,7 +322,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk case res := <-resCh: responses = append(responses, res) // log line is helpful for debugging tests - // level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp", uint64(res.Fp), "chunks", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount)) + level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount)) // wait for all parts of the full response if len(responses) == requestCount { for _, o := range responses { diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index b34e3d55852a5..183a2aad2190e 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -269,89 +269,74 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { }) t.Run("use fuse queriers to filter chunks", func(t *testing.T) { - for _, tc := range []struct { - name string - value bool - }{ - {"sequentially", true}, - {"callback", false}, - } { - t.Run(tc.name, func(t *testing.T) { - - reg := prometheus.NewRegistry() - gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg) - require.NoError(t, err) - - now := mktime("2023-10-03 10:00") - - // replace store implementation and re-initialize workers and sub-services - bqs, data := createBlockQueriers(t, 5, now.Add(-8*time.Hour), now, 0, 1024) - gw.bloomStore = newMockBloomStore(bqs) - gw.workerConfig.processBlocksSequentially = tc.value - err = gw.initServices() - require.NoError(t, err) - - t.Log("process blocks in worker sequentially", gw.workerConfig.processBlocksSequentially) - - err = services.StartAndAwaitRunning(context.Background(), gw) - require.NoError(t, err) - t.Cleanup(func() { - err = services.StopAndAwaitTerminated(context.Background(), gw) - require.NoError(t, err) - }) + reg := prometheus.NewRegistry() + gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg) + require.NoError(t, err) - chunkRefs := createQueryInputFromBlockData(t, tenantID, data, 100) - - t.Run("no match - return empty response", func(t *testing.T) { - inputChunkRefs := groupRefs(t, chunkRefs) - req := &logproto.FilterChunkRefRequest{ - From: now.Add(-8 * time.Hour), - Through: now, - Refs: inputChunkRefs, - Filters: []syntax.LineFilter{ - {Ty: labels.MatchEqual, Match: "does not match"}, - }, - } - ctx := user.InjectOrgID(context.Background(), tenantID) - res, err := gw.FilterChunkRefs(ctx, req) - require.NoError(t, err) - - expectedResponse := &logproto.FilterChunkRefResponse{ - ChunkRefs: []*logproto.GroupedChunkRefs{}, - } - require.Equal(t, expectedResponse, res) - }) + now := mktime("2023-10-03 10:00") - t.Run("match - return filtered", func(t *testing.T) { - inputChunkRefs := groupRefs(t, chunkRefs) - // hack to get indexed key for a specific series - // the indexed key range for a series is defined as - // i * keysPerSeries ... i * keysPerSeries + keysPerSeries - 1 - // where i is the nth series in a block - // fortunately, i is also used as Checksum for the single chunk of a series - // see mkBasicSeriesWithBlooms() in pkg/storage/bloom/v1/test_util.go - key := inputChunkRefs[0].Refs[0].Checksum*1000 + 500 - - req := &logproto.FilterChunkRefRequest{ - From: now.Add(-8 * time.Hour), - Through: now, - Refs: inputChunkRefs, - Filters: []syntax.LineFilter{ - {Ty: labels.MatchEqual, Match: fmt.Sprintf("series %d", key)}, - }, - } - ctx := user.InjectOrgID(context.Background(), tenantID) - res, err := gw.FilterChunkRefs(ctx, req) - require.NoError(t, err) - - expectedResponse := &logproto.FilterChunkRefResponse{ - ChunkRefs: inputChunkRefs[:1], - } - require.Equal(t, expectedResponse, res) - }) + // replace store implementation and re-initialize workers and sub-services + bqs, data := createBlockQueriers(t, 5, now.Add(-8*time.Hour), now, 0, 1024) + gw.bloomStore = newMockBloomStore(bqs) + err = gw.initServices() + require.NoError(t, err) - }) - } + err = services.StartAndAwaitRunning(context.Background(), gw) + require.NoError(t, err) + t.Cleanup(func() { + err = services.StopAndAwaitTerminated(context.Background(), gw) + require.NoError(t, err) + }) + + chunkRefs := createQueryInputFromBlockData(t, tenantID, data, 100) + + t.Run("no match - return empty response", func(t *testing.T) { + inputChunkRefs := groupRefs(t, chunkRefs) + req := &logproto.FilterChunkRefRequest{ + From: now.Add(-8 * time.Hour), + Through: now, + Refs: inputChunkRefs, + Filters: []syntax.LineFilter{ + {Ty: labels.MatchEqual, Match: "does not match"}, + }, + } + ctx := user.InjectOrgID(context.Background(), tenantID) + res, err := gw.FilterChunkRefs(ctx, req) + require.NoError(t, err) + + expectedResponse := &logproto.FilterChunkRefResponse{ + ChunkRefs: []*logproto.GroupedChunkRefs{}, + } + require.Equal(t, expectedResponse, res) + }) + + t.Run("match - return filtered", func(t *testing.T) { + inputChunkRefs := groupRefs(t, chunkRefs) + // hack to get indexed key for a specific series + // the indexed key range for a series is defined as + // i * keysPerSeries ... i * keysPerSeries + keysPerSeries - 1 + // where i is the nth series in a block + // fortunately, i is also used as Checksum for the single chunk of a series + // see mkBasicSeriesWithBlooms() in pkg/storage/bloom/v1/test_util.go + key := inputChunkRefs[0].Refs[0].Checksum*1000 + 500 + + req := &logproto.FilterChunkRefRequest{ + From: now.Add(-8 * time.Hour), + Through: now, + Refs: inputChunkRefs, + Filters: []syntax.LineFilter{ + {Ty: labels.MatchEqual, Match: fmt.Sprintf("series %d", key)}, + }, + } + ctx := user.InjectOrgID(context.Background(), tenantID) + res, err := gw.FilterChunkRefs(ctx, req) + require.NoError(t, err) + + expectedResponse := &logproto.FilterChunkRefResponse{ + ChunkRefs: inputChunkRefs[:1], + } + require.Equal(t, expectedResponse, res) + }) }) } diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index e82a0daea63c1..a8f9c56d50bab 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -20,8 +20,6 @@ import ( type workerConfig struct { maxWaitTime time.Duration maxItems int - - processBlocksSequentially bool } type workerMetrics struct { @@ -188,11 +186,7 @@ func (w *worker) running(ctx context.Context) error { blockRefs = append(blockRefs, b.blockRef) } - if w.cfg.processBlocksSequentially { - err = w.processBlocksSequentially(taskCtx, tasks[0].Tenant, day, blockRefs, boundedRefs) - } else { - err = w.processBlocksWithCallback(taskCtx, tasks[0].Tenant, day, blockRefs, boundedRefs) - } + err = w.processBlocksWithCallback(taskCtx, tasks[0].Tenant, day, blockRefs, boundedRefs) if err != nil { for _, t := range tasks { t.ErrCh <- err @@ -227,20 +221,6 @@ func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant strin }) } -func (w *worker) processBlocksSequentially(taskCtx context.Context, tenant string, day time.Time, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error { - storeFetchStart := time.Now() - blockQueriers, err := w.store.GetBlockQueriersForBlockRefs(taskCtx, tenant, blockRefs) - w.metrics.storeAccessLatency.WithLabelValues(w.id, "GetBlockQueriersForBlockRefs").Observe(time.Since(storeFetchStart).Seconds()) - if err != nil { - return err - } - - for i := range blockQueriers { - processBlock(blockQueriers[i].BlockQuerier, day, boundedRefs[i].tasks) - } - return nil -} - func processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) { schema, err := blockQuerier.Schema() if err != nil { diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index be059a2831560..c03e7c91cb960 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -493,11 +493,13 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde return "", false } - if !lbs.ParserLabelHints().ShouldExtract(sanitized) { + _, alwaysExtract := keys[sanitized] + if !alwaysExtract && !lbs.ParserLabelHints().ShouldExtract(sanitized) { return "", false } return sanitized, true }) + if !ok { continue } @@ -530,6 +532,7 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde } } } + if l.strict && l.dec.Err() != nil { addErrLabel(errLogfmt, l.dec.Err(), lbs) return line, true diff --git a/pkg/logql/log/parser_hints.go b/pkg/logql/log/parser_hints.go index cdb61015dd4dd..a8b1f73f3109d 100644 --- a/pkg/logql/log/parser_hints.go +++ b/pkg/logql/log/parser_hints.go @@ -58,10 +58,6 @@ type Hints struct { } func (p *Hints) ShouldExtract(key string) bool { - if len(p.requiredLabels) == 0 { - return true - } - for _, l := range p.extracted { if l == key { return false @@ -74,7 +70,7 @@ func (p *Hints) ShouldExtract(key string) bool { } } - return false + return len(p.requiredLabels) == 0 } func (p *Hints) ShouldExtractPrefix(prefix string) bool { @@ -95,19 +91,25 @@ func (p *Hints) NoLabels() bool { } func (p *Hints) RecordExtracted(key string) { - for _, l := range p.requiredLabels { - if l == key { - p.extracted = append(p.extracted, key) - return - } - } + p.extracted = append(p.extracted, key) } func (p *Hints) AllRequiredExtracted() bool { - if len(p.requiredLabels) == 0 { + if len(p.requiredLabels) == 0 || len(p.extracted) < len(p.requiredLabels) { return false } - return len(p.extracted) == len(p.requiredLabels) + + found := 0 + for _, l := range p.requiredLabels { + for _, e := range p.extracted { + if l == e { + found++ + break + } + } + } + + return len(p.requiredLabels) == found } func (p *Hints) Reset() { @@ -172,9 +174,6 @@ func NewParserHint(requiredLabelNames, groups []string, without, noLabels bool, return ph } - ph.requiredLabels = hints - ph.shouldPreserveError = containsError(hints) - return &Hints{requiredLabels: hints, extracted: extracted, shouldPreserveError: containsError(hints)} } diff --git a/pkg/logql/log/parser_hints_test.go b/pkg/logql/log/parser_hints_test.go index ac232bfd871b4..42d0134bc1d8f 100644 --- a/pkg/logql/log/parser_hints_test.go +++ b/pkg/logql/log/parser_hints_test.go @@ -28,7 +28,10 @@ var ( "response": { "status": 204, "latency_seconds": "30.001" - } + }, + "message": { + "message": "foo", + } }`) packedLine = []byte(`{ @@ -58,14 +61,14 @@ func Test_ParserHints(t *testing.T) { jsonLine, true, 1.0, - `{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`, + `{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`, }, { `sum without (request_host,app,cluster) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`, jsonLine, true, 1.0, - `{cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`, + `{cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`, }, { `sum by (request_host,app) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`, @@ -114,14 +117,14 @@ func Test_ParserHints(t *testing.T) { jsonLine, true, 30.001, - `{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`, + `{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`, }, { `sum without (request_host,app,cluster)(rate({app="nginx"} | json | response_status = 204 | unwrap response_latency_seconds [1m]))`, jsonLine, true, 30.001, - `{cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`, + `{cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`, }, { `sum(rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m]))`, @@ -214,6 +217,13 @@ func Test_ParserHints(t *testing.T) { 0, ``, }, + { + `sum by (message_message,app)(count_over_time({app="nginx"} | json | response_status = 204 and remote_user = "foo"[1m]))`, + jsonLine, + true, + 1, + `{app="nginx", message_message="foo"}`, + }, } { tt := tt t.Run(tt.expr, func(t *testing.T) { diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 0c4a50f813ed9..d4b58fac838f3 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -647,7 +647,7 @@ func (t *Loki) setupModuleManager() error { Read: {QueryFrontend, Querier}, Write: {Ingester, Distributor}, - Backend: {QueryScheduler, Ruler, Compactor, IndexGateway}, + Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomGateway, BloomCompactor}, All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor}, } @@ -694,13 +694,12 @@ func (t *Loki) setupModuleManager() error { } // Add bloom gateway ring in client mode to IndexGateway service dependencies if bloom filtering is enabled. - if t.Cfg.isModuleEnabled(IndexGateway) && t.Cfg.BloomGateway.Enabled { + if t.Cfg.BloomGateway.Enabled { deps[IndexGateway] = append(deps[IndexGateway], BloomGatewayRing) } - //TODO(poyzannur) not sure this is needed for BloomCompactor if t.Cfg.LegacyReadTarget { - deps[Read] = append(deps[Read], QueryScheduler, Ruler, Compactor, IndexGateway, BloomGateway, BloomCompactor) + deps[Read] = append(deps[Read], deps[Backend]...) } if t.Cfg.InternalServer.Enable { diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index 7ab99ea7e3e6e..b189cba390b82 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -144,7 +144,7 @@ func (b *BloomClient) GetMetas(ctx context.Context, params MetaSearchParams) ([] return nil, err } if metaRef.MaxFingerprint < uint64(params.MinFingerprint) || uint64(params.MaxFingerprint) < metaRef.MinFingerprint || - metaRef.StartTimestamp.Before(params.StartTimestamp) || metaRef.EndTimestamp.After(params.EndTimestamp) { + metaRef.EndTimestamp.Before(params.StartTimestamp) || metaRef.StartTimestamp.After(params.EndTimestamp) { continue } meta, err := b.downloadMeta(ctx, metaRef, periodClient) diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper.go b/pkg/storage/stores/shipper/bloomshipper/shipper.go index d7038fc13761c..d9d96fcc7783c 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper.go @@ -1,7 +1,6 @@ package bloomshipper import ( - "cmp" "context" "fmt" "math" @@ -15,6 +14,16 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" ) +type fpRange [2]uint64 + +func (r fpRange) minFp() uint64 { + return r[0] +} + +func (r fpRange) maxFp() uint64 { + return r[1] +} + type Shipper struct { client Client config config.Config @@ -43,7 +52,7 @@ func NewShipper(client Client, config config.Config, limits Limits, logger log.L func (s *Shipper) GetBlockRefs(ctx context.Context, tenantID string, from, through model.Time) ([]BlockRef, error) { level.Debug(s.logger).Log("msg", "GetBlockRefs", "tenant", tenantID, "from", from, "through", through) - blockRefs, err := s.getActiveBlockRefs(ctx, tenantID, from, through, []uint64{0, math.MaxUint64}) + blockRefs, err := s.getActiveBlockRefs(ctx, tenantID, from, through, []fpRange{{0, math.MaxUint64}}) if err != nil { return nil, fmt.Errorf("error fetching active block references : %w", err) } @@ -55,30 +64,36 @@ func (s *Shipper) Fetch(ctx context.Context, tenantID string, blocks []BlockRef, defer cancelFunc() blocksChannel, errorsChannel := s.blockDownloader.downloadBlocks(cancelContext, tenantID, blocks) + // track how many blocks are still remaning to be downloaded + remaining := len(blocks) + for { select { case <-ctx.Done(): return fmt.Errorf("failed to fetch blocks: %w", ctx.Err()) - case result, ok := <-blocksChannel: - if !ok { + case result, sentBeforeClosed := <-blocksChannel: + if !sentBeforeClosed { return nil } err := runCallback(callback, result) if err != nil { return err } - case err := <-errorsChannel: - if err != nil { - return fmt.Errorf("error downloading blocks : %w", err) + remaining-- + if remaining == 0 { + return nil } + case err := <-errorsChannel: + return fmt.Errorf("error downloading blocks : %w", err) } } } func runCallback(callback ForEachBlockCallback, block blockWithQuerier) error { - defer func(result blockWithQuerier) { - _ = result.Close() + defer func(b blockWithQuerier) { + _ = b.Close() }(block) + err := callback(block.closableBlockQuerier.BlockQuerier, block.MinFingerprint, block.MaxFingerprint) if err != nil { return fmt.Errorf("error running callback function for block %s err: %w", block.BlockPath, err) @@ -86,17 +101,6 @@ func runCallback(callback ForEachBlockCallback, block blockWithQuerier) error { return nil } -func (s *Shipper) ForEachBlock(ctx context.Context, tenantID string, from, through model.Time, fingerprints []uint64, callback ForEachBlockCallback) error { - level.Debug(s.logger).Log("msg", "ForEachBlock", "tenant", tenantID, "from", from, "through", through, "fingerprints", len(fingerprints)) - - blockRefs, err := s.getActiveBlockRefs(ctx, tenantID, from, through, fingerprints) - if err != nil { - return fmt.Errorf("error fetching active block references : %w", err) - } - - return s.Fetch(ctx, tenantID, blockRefs, callback) -} - func (s *Shipper) Stop() { s.client.Stop() s.blockDownloader.stop() @@ -112,18 +116,19 @@ func getFirstLast[T any](s []T) (T, T) { return s[0], s[len(s)-1] } -func (s *Shipper) getActiveBlockRefs(ctx context.Context, tenantID string, from, through model.Time, fingerprints []uint64) ([]BlockRef, error) { - minFingerprint, maxFingerprint := getFirstLast(fingerprints) +func (s *Shipper) getActiveBlockRefs(ctx context.Context, tenantID string, from, through model.Time, fingerprints []fpRange) ([]BlockRef, error) { + minFpRange, maxFpRange := getFirstLast(fingerprints) metas, err := s.client.GetMetas(ctx, MetaSearchParams{ TenantID: tenantID, - MinFingerprint: model.Fingerprint(minFingerprint), - MaxFingerprint: model.Fingerprint(maxFingerprint), + MinFingerprint: model.Fingerprint(minFpRange.minFp()), + MaxFingerprint: model.Fingerprint(maxFpRange.maxFp()), StartTimestamp: from, EndTimestamp: through, }) if err != nil { return []BlockRef{}, fmt.Errorf("error fetching meta.json files: %w", err) } + level.Debug(s.logger).Log("msg", "dowloaded metas", "count", len(metas)) activeBlocks := s.findBlocks(metas, from, through, fingerprints) slices.SortStableFunc(activeBlocks, func(a, b BlockRef) int { if a.MinFingerprint < b.MinFingerprint { @@ -138,7 +143,7 @@ func (s *Shipper) getActiveBlockRefs(ctx context.Context, tenantID string, from, return activeBlocks, nil } -func (s *Shipper) findBlocks(metas []Meta, startTimestamp, endTimestamp model.Time, fingerprints []uint64) []BlockRef { +func (s *Shipper) findBlocks(metas []Meta, startTimestamp, endTimestamp model.Time, fingerprints []fpRange) []BlockRef { outdatedBlocks := make(map[string]interface{}) for _, meta := range metas { for _, tombstone := range meta.Tombstones { @@ -164,39 +169,29 @@ func (s *Shipper) findBlocks(metas []Meta, startTimestamp, endTimestamp model.Ti return blockRefs } -// getPosition returns the smallest index of element v in slice s where v > s[i] -// TODO(chaudum): Use binary search to find index instead of iteration. -func getPosition[S ~[]E, E cmp.Ordered](s S, v E) int { - for i := range s { - if v > s[i] { - continue - } - return i - } - return len(s) -} - -func isOutsideRange(b *BlockRef, startTimestamp, endTimestamp model.Time, fingerprints []uint64) bool { +// isOutsideRange tests if a given BlockRef b is outside of search boundaries +// defined by min/max timestamp and min/max fingerprint. +// Fingerprint ranges must be sorted in ascending order. +func isOutsideRange(b *BlockRef, startTimestamp, endTimestamp model.Time, fingerprints []fpRange) bool { // First, check time range if b.EndTimestamp < startTimestamp || b.StartTimestamp > endTimestamp { return true } // Then, check if outside of min/max of fingerprint slice - minFp, maxFp := getFirstLast(fingerprints) - if b.MaxFingerprint < minFp || b.MinFingerprint > maxFp { + minFpRange, maxFpRange := getFirstLast(fingerprints) + if b.MaxFingerprint < minFpRange.minFp() || b.MinFingerprint > maxFpRange.maxFp() { return true } - // Check if the block range is inside a "gap" in the fingerprint slice - // e.g. - // fingerprints = [1, 2, 6, 7, 8] - // block = [3, 4, 5] - idx := getPosition[[]uint64](fingerprints, b.MinFingerprint) - // in case b.MinFingerprint is outside of the fingerprints range, return true - // this is already covered in the range check above, but I keep it as a second gate - if idx > len(fingerprints)-1 { - return true + prev := fpRange{0, 0} + for i := 0; i < len(fingerprints); i++ { + fpr := fingerprints[i] + if b.MinFingerprint > prev.maxFp() && b.MaxFingerprint < fpr.minFp() { + return true + } + prev = fpr } - return b.MaxFingerprint < fingerprints[idx] + + return false } diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go index 83c9379cd44c6..859aa38c82a61 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math" "testing" + "time" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -40,7 +41,7 @@ func Test_Shipper_findBlocks(t *testing.T) { } shipper := &Shipper{} - blocks := shipper.findBlocks(metas, 300, 400, []uint64{100, 200}) + blocks := shipper.findBlocks(metas, model.Now().Add(-2*time.Hour), model.Now().Add(-1*time.Hour), []fpRange{{100, 200}}) expectedBlockRefs := []BlockRef{ createMatchingBlockRef("block2"), @@ -53,8 +54,8 @@ func Test_Shipper_findBlocks(t *testing.T) { tests := map[string]struct { minFingerprint uint64 maxFingerprint uint64 - startTimestamp int64 - endTimestamp int64 + startTimestamp model.Time + endTimestamp model.Time filtered bool }{ "expected block not to be filtered out if minFingerprint and startTimestamp are within range": { @@ -94,7 +95,7 @@ func Test_Shipper_findBlocks(t *testing.T) { t.Run(name, func(t *testing.T) { shipper := &Shipper{} ref := createBlockRef("fake-block", data.minFingerprint, data.maxFingerprint, data.startTimestamp, data.endTimestamp) - blocks := shipper.findBlocks([]Meta{{Blocks: []BlockRef{ref}}}, 300, 400, []uint64{100, 110, 120, 130, 140, 150, 160, 170, 180, 190, 200}) + blocks := shipper.findBlocks([]Meta{{Blocks: []BlockRef{ref}}}, 300, 400, []fpRange{{100, 200}}) if data.filtered { require.Empty(t, blocks) return @@ -105,94 +106,83 @@ func Test_Shipper_findBlocks(t *testing.T) { } } -func TestGetPosition(t *testing.T) { - for i, tc := range []struct { - s []int - v int - exp int - }{ - {s: []int{}, v: 1, exp: 0}, - {s: []int{1, 2, 3}, v: 0, exp: 0}, - {s: []int{1, 2, 3}, v: 2, exp: 1}, - {s: []int{1, 2, 3}, v: 4, exp: 3}, - {s: []int{1, 2, 4, 5}, v: 3, exp: 2}, - } { - tc := tc - name := fmt.Sprintf("case-%d", i) - t.Run(name, func(t *testing.T) { - got := getPosition[[]int](tc.s, tc.v) - require.Equal(t, tc.exp, got) - }) - } -} - func TestIsOutsideRange(t *testing.T) { + startTs := model.Time(1000) + endTs := model.Time(2000) + t.Run("is outside if startTs > through", func(t *testing.T) { - b := createBlockRef("block", 0, math.MaxUint64, 100, 200) - isOutside := isOutsideRange(&b, 0, 90, []uint64{}) + b := createBlockRef("block", 0, math.MaxUint64, startTs, endTs) + isOutside := isOutsideRange(&b, 0, 900, []fpRange{}) require.True(t, isOutside) }) t.Run("is outside if endTs < from", func(t *testing.T) { - b := createBlockRef("block", 0, math.MaxUint64, 100, 200) - isOutside := isOutsideRange(&b, 210, 300, []uint64{}) + b := createBlockRef("block", 0, math.MaxUint64, startTs, endTs) + isOutside := isOutsideRange(&b, 2100, 3000, []fpRange{}) require.True(t, isOutside) }) t.Run("is outside if endFp < first fingerprint", func(t *testing.T) { - b := createBlockRef("block", 0, 90, 100, 200) - isOutside := isOutsideRange(&b, 100, 200, []uint64{100, 200}) + b := createBlockRef("block", 0, 90, startTs, endTs) + isOutside := isOutsideRange(&b, startTs, endTs, []fpRange{{100, 199}}) require.True(t, isOutside) }) t.Run("is outside if startFp > last fingerprint", func(t *testing.T) { - b := createBlockRef("block", 210, math.MaxUint64, 100, 200) - isOutside := isOutsideRange(&b, 100, 200, []uint64{100, 200}) + b := createBlockRef("block", 200, math.MaxUint64, startTs, endTs) + isOutside := isOutsideRange(&b, startTs, endTs, []fpRange{{0, 49}, {100, 149}}) require.True(t, isOutside) }) t.Run("is outside if within gaps in fingerprints", func(t *testing.T) { - b := createBlockRef("block", 100, 200, 100, 200) - isOutside := isOutsideRange(&b, 100, 200, []uint64{0, 99, 201, 300}) + b := createBlockRef("block", 100, 199, startTs, endTs) + isOutside := isOutsideRange(&b, startTs, endTs, []fpRange{{0, 99}, {200, 299}}) require.True(t, isOutside) }) t.Run("is not outside if within fingerprints 1", func(t *testing.T) { - b := createBlockRef("block", 100, 200, 100, 200) - isOutside := isOutsideRange(&b, 100, 200, []uint64{0, 100, 200, 300}) + b := createBlockRef("block", 10, 90, startTs, endTs) + isOutside := isOutsideRange(&b, startTs, endTs, []fpRange{{0, 99}, {200, 299}}) require.False(t, isOutside) }) t.Run("is not outside if within fingerprints 2", func(t *testing.T) { - b := createBlockRef("block", 100, 150, 100, 200) - isOutside := isOutsideRange(&b, 100, 200, []uint64{0, 100, 200, 300}) + b := createBlockRef("block", 210, 290, startTs, endTs) + isOutside := isOutsideRange(&b, startTs, endTs, []fpRange{{0, 99}, {200, 299}}) + require.False(t, isOutside) + }) + + t.Run("is not outside if spans across multiple fingerprint ranges", func(t *testing.T) { + b := createBlockRef("block", 50, 250, startTs, endTs) + isOutside := isOutsideRange(&b, startTs, endTs, []fpRange{{0, 99}, {200, 299}}) require.False(t, isOutside) }) - t.Run("is not outside if within fingerprints 3", func(t *testing.T) { - b := createBlockRef("block", 150, 200, 100, 200) - isOutside := isOutsideRange(&b, 100, 200, []uint64{0, 100, 200, 300}) + t.Run("is not outside if fingerprint range and time range are larger than block", func(t *testing.T) { + b := createBlockRef("block", math.MaxUint64/3, math.MaxUint64/3*2, startTs, endTs) + isOutside := isOutsideRange(&b, 0, 3000, []fpRange{{0, math.MaxUint64}}) require.False(t, isOutside) }) } func createMatchingBlockRef(blockPath string) BlockRef { - return createBlockRef(blockPath, 0, uint64(math.MaxUint64), 0, math.MaxInt) + return createBlockRef(blockPath, 0, math.MaxUint64, model.Time(0), model.Now()) } func createBlockRef( blockPath string, minFingerprint, maxFingerprint uint64, - startTimestamp, endTimestamp int64, + startTimestamp, endTimestamp model.Time, ) BlockRef { + day := startTimestamp.Unix() / int64(24*time.Hour/time.Second) return BlockRef{ Ref: Ref{ TenantID: "fake", - TableName: "16600", + TableName: fmt.Sprintf("%d", day), MinFingerprint: minFingerprint, MaxFingerprint: maxFingerprint, - StartTimestamp: model.Time(startTimestamp), - EndTimestamp: model.Time(endTimestamp), + StartTimestamp: startTimestamp, + EndTimestamp: endTimestamp, Checksum: 0, }, // block path is unique, and it's used to distinguish the blocks so the rest of the fields might be skipped in this test diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index 06e1d7a4675bf..40c23658e9a1a 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -2,7 +2,6 @@ package bloomshipper import ( "context" - "sort" "time" "github.com/prometheus/common/model" @@ -14,7 +13,6 @@ type ForEachBlockCallback func(bq *v1.BlockQuerier, minFp, maxFp uint64) error type ReadShipper interface { GetBlockRefs(ctx context.Context, tenant string, from, through model.Time) ([]BlockRef, error) - ForEachBlock(ctx context.Context, tenant string, from, through model.Time, fingerprints []uint64, callback ForEachBlockCallback) error Fetch(ctx context.Context, tenant string, blocks []BlockRef, callback ForEachBlockCallback) error } @@ -30,8 +28,6 @@ type BlockQuerierWithFingerprintRange struct { type Store interface { GetBlockRefs(ctx context.Context, tenant string, from, through time.Time) ([]BlockRef, error) - GetBlockQueriers(ctx context.Context, tenant string, from, through time.Time, fingerprints []uint64) ([]BlockQuerierWithFingerprintRange, error) - GetBlockQueriersForBlockRefs(ctx context.Context, tenant string, blocks []BlockRef) ([]BlockQuerierWithFingerprintRange, error) ForEach(ctx context.Context, tenant string, blocks []BlockRef, callback ForEachBlockCallback) error Stop() } @@ -60,40 +56,6 @@ func (bs *BloomStore) ForEach(ctx context.Context, tenant string, blocks []Block return bs.shipper.Fetch(ctx, tenant, blocks, callback) } -// GetQueriersForBlocks implements Store -func (bs *BloomStore) GetBlockQueriersForBlockRefs(ctx context.Context, tenant string, blocks []BlockRef) ([]BlockQuerierWithFingerprintRange, error) { - bqs := make([]BlockQuerierWithFingerprintRange, 0, 32) - err := bs.shipper.Fetch(ctx, tenant, blocks, func(bq *v1.BlockQuerier, minFp uint64, maxFp uint64) error { - bqs = append(bqs, BlockQuerierWithFingerprintRange{ - BlockQuerier: bq, - MinFp: model.Fingerprint(minFp), - MaxFp: model.Fingerprint(maxFp), - }) - return nil - }) - sort.Slice(bqs, func(i, j int) bool { - return bqs[i].MinFp < bqs[j].MinFp - }) - return bqs, err -} - -// BlockQueriers implements Store -func (bs *BloomStore) GetBlockQueriers(ctx context.Context, tenant string, from, through time.Time, fingerprints []uint64) ([]BlockQuerierWithFingerprintRange, error) { - bqs := make([]BlockQuerierWithFingerprintRange, 0, 32) - err := bs.shipper.ForEachBlock(ctx, tenant, toModelTime(from), toModelTime(through), fingerprints, func(bq *v1.BlockQuerier, minFp uint64, maxFp uint64) error { - bqs = append(bqs, BlockQuerierWithFingerprintRange{ - BlockQuerier: bq, - MinFp: model.Fingerprint(minFp), - MaxFp: model.Fingerprint(maxFp), - }) - return nil - }) - sort.Slice(bqs, func(i, j int) bool { - return bqs[i].MinFp < bqs[j].MinFp - }) - return bqs, err -} - func toModelTime(t time.Time) model.Time { return model.TimeFromUnixNano(t.UnixNano()) } diff --git a/pkg/storage/stores/shipper/indexshipper/indexgateway/gateway.go b/pkg/storage/stores/shipper/indexshipper/indexgateway/gateway.go index 1040bd6c1b565..8b0f186386bdf 100644 --- a/pkg/storage/stores/shipper/indexshipper/indexgateway/gateway.go +++ b/pkg/storage/stores/shipper/indexshipper/indexgateway/gateway.go @@ -204,7 +204,7 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ return nil, err } - predicate := chunk.NewPredicate(matchers, *(&req.Filters)) + predicate := chunk.NewPredicate(matchers, req.Filters) chunks, _, err := g.indexQuerier.GetChunks(ctx, instanceID, req.From, req.Through, predicate) if err != nil { return nil, err @@ -219,8 +219,11 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ } } + initialChunkCount := len(result.Refs) + // Return unfiltered results if there is no bloom querier (Bloom Gateway disabled) or if there are not filters. if g.bloomQuerier == nil || len(req.Filters) == 0 { + level.Info(g.log).Log("msg", "chunk filtering is not enabled or there is no line filter", "filters", len(req.Filters)) return result, nil } @@ -234,6 +237,7 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ } result.Refs = chunkRefs + level.Info(g.log).Log("msg", "return filtered chunk refs", "unfiltered", initialChunkCount, "filtered", len(result.Refs)) return result, nil } diff --git a/pkg/tracing/config.go b/pkg/tracing/config.go index 1c97d88a845df..f9faefa6a7303 100644 --- a/pkg/tracing/config.go +++ b/pkg/tracing/config.go @@ -5,7 +5,8 @@ import ( ) type Config struct { - Enabled bool `yaml:"enabled"` + Enabled bool `yaml:"enabled"` + ProfilingEnabled bool `yaml:"profiling_enabled" category:"experimental" doc:"hidden"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -14,4 +15,5 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.BoolVar(&cfg.Enabled, prefix+"tracing.enabled", true, "Set to false to disable tracing.") + f.BoolVar(&cfg.ProfilingEnabled, prefix+"tracing.profiling-enabled", true, "Set to true to enable profiling integration.") } diff --git a/production/docker/config/loki.yaml b/production/docker/config/loki.yaml index 6e4541164a235..0a124e5ccfaae 100644 --- a/production/docker/config/loki.yaml +++ b/production/docker/config/loki.yaml @@ -9,19 +9,20 @@ server: common: path_prefix: /loki - storage: - s3: - endpoint: minio:9000 - insecure: true - bucketnames: loki-data - access_key_id: loki - secret_access_key: supersecret - s3forcepathstyle: true - compactor_address: http://loki-write:3100 + compactor_address: http://loki-backend:3100 replication_factor: 3 +storage_config: + aws: + endpoint: minio:9000 + insecure: true + bucketnames: loki-data + access_key_id: loki + secret_access_key: supersecret + s3forcepathstyle: true + memberlist: - join_members: ["loki-read", "loki-write"] + join_members: ["loki-read", "loki-write", "loki-backend"] dead_node_reclaim_time: 30s gossip_to_dead_nodes_time: 15s left_ingesters_timeout: 30s @@ -54,6 +55,10 @@ ruler: enable_sharding: true wal: dir: /loki/ruler-wal + evaluation: + mode: remote + query_frontend: + address: dns:///loki-read:9095 storage: type: local local: @@ -85,6 +90,13 @@ schema_config: index: prefix: index_ period: 24h + - from: 2024-01-10 + store: tsdb + object_store: s3 + schema: v12 + index: + prefix: index_ + period: 24h limits_config: diff --git a/production/docker/config/prometheus.yaml b/production/docker/config/prometheus.yaml index 9bb03bb209047..3369106f94001 100644 --- a/production/docker/config/prometheus.yaml +++ b/production/docker/config/prometheus.yaml @@ -11,6 +11,7 @@ scrape_configs: - names: - loki-read - loki-write + - loki-backend type: A port: 3100 - job_name: 'promtail' diff --git a/production/docker/docker-compose.yaml b/production/docker/docker-compose.yaml index 5c1b93f829173..a4f74c7bb1182 100644 --- a/production/docker/docker-compose.yaml +++ b/production/docker/docker-compose.yaml @@ -89,7 +89,7 @@ services: - | mkdir -p /data/loki-data && \ mkdir -p /data/loki-ruler && - minio server /data + minio server --address "0.0.0.0:9000" --console-address "0.0.0.0:9001" /data environment: - MINIO_ROOT_USER=loki - MINIO_ROOT_PASSWORD=supersecret @@ -97,6 +97,7 @@ services: - MINIO_UPDATE=off ports: - "9000:9000" + - "9001:9001" volumes: - ./.data/minio:/data networks: @@ -116,7 +117,6 @@ services: image: *lokiImage volumes: - ./config:/etc/loki/ - - ./rules:/loki/rules:ro # only needed for interactive debugging with dlv # cap_add: # - SYS_PTRACE @@ -127,7 +127,7 @@ services: - "7946" # uncomment to use interactive debugging # - "40000-40002:40000" # makes the replicas available on ports 40000, 40001, 40002 - command: "-config.file=/etc/loki/loki.yaml -target=read" + command: "-config.file=/etc/loki/loki.yaml -target=read -legacy-read-mode=false" networks: - loki restart: always @@ -161,6 +161,7 @@ services: image: *lokiImage volumes: - ./config:/etc/loki/ + - ./rules:/loki/rules:ro # only needed for interactive debugging with dlv # cap_add: # - SYS_PTRACE diff --git a/vendor/github.com/grafana/dskit/spanprofiler/README.md b/vendor/github.com/grafana/dskit/spanprofiler/README.md new file mode 100644 index 0000000000000..a415985f6649e --- /dev/null +++ b/vendor/github.com/grafana/dskit/spanprofiler/README.md @@ -0,0 +1,104 @@ +# Span Profiler for OpenTracing-Go + +## Overview + +The Span Profiler for OpenTracing-Go is a package that seamlessly integrates `opentracing-go` instrumentation with +profiling through the use of pprof labels. + +Accessing trace span profiles is made convenient through the Grafana Explore view. You can find a complete example setup +with Grafana Tempo in the [Pyroscope repository](https://github.com/grafana/pyroscope/tree/main/examples/tracing/tempo): + +![image](https://github.com/grafana/otel-profiling-go/assets/12090599/31e33cd1-818b-4116-b952-c9ec7b1fb593) + +## Usage + +There are two primary ways to use the Span Profiler: + +### 1. Wrap the Global Tracer. + +You can wrap the global tracer using `spanprofiler.NewTracer`: + +```go +import ( + "github.com/opentracing/opentracing-go" + "github.com/grafana/dskit/spanprofiler" +) + +func main() { + // Initialize your OpenTracing tracer + tracer := opentracing.GlobalTracer() + // Wrap it with the tracer-profiler + wrappedTracer := spanprofiler.NewTracer(tracer) + // Use the wrapped tracer in your application + opentracing.SetGlobalTracer(wrappedTracer) + + // Or, as an oneliner: + // opentracing.SetGlobalTracer(spanprofiler.NewTracer(opentracing.GlobalTracer())) + + // Your application logic here +} +``` + +For efficiency, the tracer selectively records profiles for _root_ spans — the initial _local_ span in a process — since +a trace may encompass thousands of spans. All stack trace samples accumulated during the execution of their child spans +contribute to the root span's profile. In practical terms, this signifies that, for instance, an HTTP request results +in a singular profile, irrespective of the numerous spans within the trace. It's important to note that these profiles +don't extend beyond the boundaries of a single process. + +The limitation of this approach is that only spans created within the same goroutine, or its children, as the parent are +taken into account. Consequently, in scenarios involving asynchronous execution, where the parent span context is passed +to another goroutine, explicit profiling becomes necessary using `spanprofiler.StartSpanFromContext`. + +### 2. Profile individual spans. + +The `spanprofiler.StartSpanFromContext` function allows you to granularly control which spans to profile: + +```go +func YourOperationName(ctx context.Background()) { + // Start a span and enable profiling for it + span, ctx := spanprofiler.StartSpanFromContext(ctx, "YourOperationName", tracer) + defer span.Finish() // Finish the span when done + + // Use the span in your application logic +} +``` + +The function guarantees that the span is to be profiled. + +Both methods can be employed either in conjunction or independently. Our recommendation is to utilize the tracer for +seamless integration, reserving explicit span profiling only for cases where spans are spawned in detached goroutines. + +## Implementation details + +When a new trace span is created, and is eligible for profiling, the tracer sets `span_id` and `span_name` [pprof labels](https://github.com/google/pprof/blob/master/doc/README.md#tag-filtering) +that point to the respective span. These labels are stored in the goroutine's local storage and inherited by any +subsequent child goroutines. + +`span_name` is available as a regular label and can be used in the query expressions. For example, the following query +will show you profile for the code that is not covered with traces: +``` +{service_name="my-service",span_name=""} +``` + +Additionally, trace spans are identified by the `pyroscope.profile.id` attribute, indicating the associated profile. +This allows to find such spans in the trace view (in the screenshot) and fetch profiles for specific spans. + +It's important to note that the presence of this attribute does not guarantee profile availability; stack trace samples +might not be collected if the CPU time utilized falls below the sample interval (10ms). + +It is crucial to understand that this module doesn't directly control the pprof profiler; its initialization is still +necessary for profile collection. This initialization can be achieved through the `runtime/pprof` package, or using the +[Pyroscope client](https://github.com/grafana/pyroscope-go). + +Limitations: + - Only CPU profiling is fully supported at the moment. + - Only [Jaeger tracer](https://github.com/jaegertracing/jaeger-client-go) implementation is supported. + +## Performance implications + +The typical performance impact is generally imperceptible and primarily arises from the cost of pprof labeling. However, +intensive use of pprof labels may have negative impact on the profiled application. + +In the case of the tracer provided by this package, the `StartSpan` method wrapper introduces an approximate 20% increase +in CPU time compared to the original call. In vase majority of cases, the overhead constitutes less than 0.01% of the total +CPU time and is considered safe for deployment in production systems. diff --git a/vendor/github.com/grafana/dskit/spanprofiler/spanprofiler.go b/vendor/github.com/grafana/dskit/spanprofiler/spanprofiler.go new file mode 100644 index 0000000000000..8481d04498d5a --- /dev/null +++ b/vendor/github.com/grafana/dskit/spanprofiler/spanprofiler.go @@ -0,0 +1,107 @@ +package spanprofiler + +import ( + "context" + "runtime/pprof" + + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go" +) + +// StartSpanFromContext starts and returns a Span with `operationName`, using +// any Span found within `ctx` as a ChildOfRef. If no such parent could be +// found, StartSpanFromContext creates a root (parentless) Span. +// +// The call sets `operationName` as `span_name` pprof label, and the new span +// identifier as `span_id` pprof label, if the trace is sampled. +// +// The second return value is a context.Context object built around the +// returned Span. +// +// Example usage: +// +// SomeFunction(ctx context.Context, ...) { +// sp, ctx := opentracing.StartSpanFromContext(ctx, "SomeFunction") +// defer sp.Finish() +// ... +// } +func StartSpanFromContext(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) { + return StartSpanFromContextWithTracer(ctx, opentracing.GlobalTracer(), operationName, opts...) +} + +// StartSpanFromContextWithTracer starts and returns a span with `operationName` +// using a span found within the context as a ChildOfRef. If that doesn't exist +// it creates a root span. It also returns a context.Context object built +// around the returned span. +// +// The call sets `operationName` as `span_name` pprof label, and the new span +// identifier as `span_id` pprof label, if the trace is sampled. +// +// It's behavior is identical to StartSpanFromContext except that it takes an explicit +// tracer as opposed to using the global tracer. +func StartSpanFromContextWithTracer(ctx context.Context, tracer opentracing.Tracer, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) { + span, ctx := opentracing.StartSpanFromContextWithTracer(ctx, tracer, operationName, opts...) + spanCtx, ok := span.Context().(jaeger.SpanContext) + if ok { + span = wrapJaegerSpanWithGoroutineLabels(ctx, span, operationName, sampledSpanID(spanCtx)) + } + return span, ctx +} + +func wrapJaegerSpanWithGoroutineLabels( + parentCtx context.Context, + span opentracing.Span, + operationName string, + spanID string, +) *spanWrapper { + // Note that pprof labels are propagated through the goroutine's local + // storage and are always copied to child goroutines. This way, stack + // trace samples collected during execution of child spans will be taken + // into account at the root. + var ctx context.Context + if spanID != "" { + ctx = pprof.WithLabels(parentCtx, pprof.Labels( + spanNameLabelName, operationName, + spanIDLabelName, spanID)) + } else { + // Even if the trace has not been sampled, we still need to keep track + // of samples that belong to the span (all spans with the given name). + ctx = pprof.WithLabels(parentCtx, pprof.Labels( + spanNameLabelName, operationName)) + } + // Goroutine labels should be set as early as possible, + // in order to capture the overhead of the function call. + pprof.SetGoroutineLabels(ctx) + // We create a span wrapper to ensure we remove the newly attached pprof + // labels when span finishes. The need of this wrapper is questioned: + // as we do not have the original context, we could leave the goroutine + // labels – normally, span is finished at the very end of the goroutine's + // lifetime, so no significant side effects should take place. + w := spanWrapper{ + parentPprofCtx: parentCtx, + currentPprofCtx: ctx, + } + w.Span = span.SetTag(profileIDTagKey, spanID) + return &w +} + +type spanWrapper struct { + parentPprofCtx context.Context + currentPprofCtx context.Context + opentracing.Span +} + +func (s *spanWrapper) Finish() { + s.Span.Finish() + pprof.SetGoroutineLabels(s.parentPprofCtx) + s.currentPprofCtx = s.parentPprofCtx +} + +// sampledSpanID returns the span ID, if the span is sampled, +// otherwise an empty string is returned. +func sampledSpanID(spanCtx jaeger.SpanContext) string { + if spanCtx.IsSampled() { + return spanCtx.SpanID().String() + } + return "" +} diff --git a/vendor/github.com/grafana/dskit/spanprofiler/tracer.go b/vendor/github.com/grafana/dskit/spanprofiler/tracer.go new file mode 100644 index 0000000000000..c28b52b11d444 --- /dev/null +++ b/vendor/github.com/grafana/dskit/spanprofiler/tracer.go @@ -0,0 +1,109 @@ +package spanprofiler + +import ( + "context" + "unsafe" + + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go" +) + +const ( + profileIDTagKey = "pyroscope.profile.id" + + spanIDLabelName = "span_id" + spanNameLabelName = "span_name" +) + +type tracer struct{ opentracing.Tracer } + +// NewTracer creates a new opentracing.Tracer with the span profiler integrated. +// +// For efficiency, the tracer selectively records profiles for _root_ spans +// — the initial _local_ span in a process — since a trace may encompass +// thousands of spans. All stack trace samples accumulated during the execution +// of their child spans contribute to the root span's profile. In practical +// terms, this signifies that, for instance, an HTTP request results in a +// singular profile, irrespective of the numerous spans within the trace. It's +// important to note that these profiles don't extend beyond the boundaries of +// a single process. +// +// The limitation of this approach is that only spans created within the same +// goroutine, or its children, as the parent are taken into account. +// Consequently, in scenarios involving asynchronous execution, where the parent +// span context is passed to another goroutine, explicit profiling becomes +// necessary using `spanprofiler.StartSpanFromContext`. +func NewTracer(tr opentracing.Tracer) opentracing.Tracer { return &tracer{tr} } + +func (t *tracer) StartSpan(operationName string, opts ...opentracing.StartSpanOption) opentracing.Span { + span := t.Tracer.StartSpan(operationName, opts...) + spanCtx, ok := span.Context().(jaeger.SpanContext) + if !ok { + return span + } + // pprof labels are attached only once, at the span root level. + if !isRootSpan(opts...) { + return span + } + // The pprof label API assumes that pairs of labels are passed through the + // context. Unfortunately, the opentracing Tracer API doesn't match this + // concept: this makes it impossible to save an existing pprof context and + // all the original pprof labels associated with the goroutine. + ctx := context.Background() + return wrapJaegerSpanWithGoroutineLabels(ctx, span, operationName, sampledSpanID(spanCtx)) +} + +// isRootSpan reports whether the span is a root span. +// +// There are only two valid cases: if the span is the first span in the trace, +// or is the first _local_ span in the trace. +// +// An exception is made for FollowsFrom reference: spans without an explicit +// parent are considered as root ones. +func isRootSpan(opts ...opentracing.StartSpanOption) bool { + parent, ok := parentSpanContextFromRef(opts...) + return !ok || isRemoteSpan(parent) +} + +// parentSpanContextFromRef returns the first parent reference. +func parentSpanContextFromRef(options ...opentracing.StartSpanOption) (sc jaeger.SpanContext, ok bool) { + var sso opentracing.StartSpanOptions + for _, option := range options { + option.Apply(&sso) + } + for _, ref := range sso.References { + if ref.Type == opentracing.ChildOfRef && ref.ReferencedContext != nil { + sc, ok = ref.ReferencedContext.(jaeger.SpanContext) + return sc, ok + } + } + return sc, ok +} + +// isRemoteSpan reports whether the span context represents a remote parent. +// +// NOTE(kolesnikovae): this is ugly, but the only reliable method I found. +// The opentracing-go package and Jaeger client are not meant to change as +// both are deprecated. +func isRemoteSpan(c jaeger.SpanContext) bool { + jaegerCtx := *(*jaegerSpanCtx)(unsafe.Pointer(&c)) + return jaegerCtx.remote +} + +// jaegerSpanCtx represents memory layout of the jaeger.SpanContext type. +type jaegerSpanCtx struct { + traceID [16]byte // TraceID + spanID [8]byte // SpanID + parentID [8]byte // SpanID + baggage uintptr // map[string]string + debugID [2]uintptr // string + + // samplingState is a pointer to a struct that has "localRootSpan" member, + // which we could probably use: that would allow omitting quite expensive + // parentSpanContextFromRef call. However, interpreting the pointer and + // the complex struct memory layout is more complicated and dangerous. + samplingState uintptr + + // remote indicates that span context represents a remote parent + remote bool +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 8ce4557f461ba..b69a2f1e5315d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -894,6 +894,7 @@ github.com/grafana/dskit/server github.com/grafana/dskit/services github.com/grafana/dskit/signals github.com/grafana/dskit/spanlogger +github.com/grafana/dskit/spanprofiler github.com/grafana/dskit/tenant github.com/grafana/dskit/test github.com/grafana/dskit/tracing