From 6631c0f01d193fdb97e6875cd5a63d2b18b30fdc Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Fri, 19 Jul 2024 09:36:54 -0400 Subject: [PATCH] chore: shard limited queries with a fixed sharding factor (#13576) Signed-off-by: Edward Welch --- pkg/querier/queryrange/roundtrip.go | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 44aa0b0f4a0b5..8ea588d11fd6f 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -206,7 +206,7 @@ func NewMiddleware( return nil, nil, err } - limitedTripperware, err := NewLimitedTripperware(cfg, log, limits, schema, metrics, codec, iqo, metricsNamespace) + limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, metrics, codec, iqo, indexStatsTripperware, metricsNamespace) if err != nil { return nil, nil, err } @@ -615,14 +615,35 @@ func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Lo } // NewLimitedTripperware creates a new frontend tripperware responsible for handling log requests which are label matcher only, no filter expression. -func NewLimitedTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, merger base.Merger, iqo util.IngesterQueryOptions, metricsNamespace string) (base.Middleware, error) { +func NewLimitedTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, merger base.Merger, iqo util.IngesterQueryOptions, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) { return base.MiddlewareFunc(func(next base.Handler) base.Handler { + statsHandler := indexStatsTripperware.Wrap(next) + queryRangeMiddleware := []base.Middleware{ StatsCollectorMiddleware(), NewLimitsMiddleware(limits), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), SplitByIntervalMiddleware(schema.Configs, WithMaxParallelism(limits, limitedQuerySplits), merger, newDefaultSplitter(limits, iqo), metrics.SplitByMetrics), } + + if cfg.ShardedQueries { + queryRangeMiddleware = append(queryRangeMiddleware, + NewQueryShardMiddleware( + log, + schema.Configs, + engineOpts, + metrics.InstrumentMiddlewareMetrics, // instrumentation is included in the sharding middleware + metrics.MiddlewareMapperMetrics.shardMapper, + limits, + // Too many shards on limited queries results in slowing down this type of query + // and overwhelming the frontend, therefore we fix the number of shards to prevent this. + 32, // 0 is unlimited shards + statsHandler, + cfg.ShardAggregations, + ), + ) + } + if cfg.MaxRetries > 0 { queryRangeMiddleware = append( queryRangeMiddleware, base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics),