diff --git a/.gitignore b/.gitignore index 66eb0a8cefeb2..83ab9c808d348 100644 --- a/.gitignore +++ b/.gitignore @@ -27,8 +27,8 @@ cmd/querytee/querytee dlv rootfs/ dist -coverage.txt -test_results.txt +*coverage.txt +*test_results.txt .DS_Store .aws-sam .idea diff --git a/CHANGELOG.md b/CHANGELOG.md index 8abd9a846458b..fa8861228407f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ##### Enhancements +* [11814](https://github.com/grafana/loki/pull/11814) **kavirajk**: feat: Support split align and caching for instant metric query results * [11851](https://github.com/grafana/loki/pull/11851) **elcomtik**: Helm: Allow the definition of resources for GrafanaAgent pods. * [11819](https://github.com/grafana/loki/pull/11819) **jburnham**: Ruler: Add the ability to disable the `X-Scope-OrgId` tenant identification header in remote write requests. * [11633](https://github.com/grafana/loki/pull/11633) **cyriltovena**: Add profiling integrations to tracing instrumentation. @@ -70,7 +71,7 @@ * [11657](https://github.com/grafana/loki/pull/11657) **ashwanthgoli** Log results cache: compose empty response based on the request being served to avoid returning incorrect limit or direction. * [11587](https://github.com/grafana/loki/pull/11587) **trevorwhitney** Fix semantics of label parsing logic of metrics and logs queries. Both only parse the first label if multiple extractions into the same label are requested. * [11776](https://github.com/grafana/loki/pull/11776) **ashwanthgoli** Background Cache: Fixes a bug that is causing the background queue size to be incremented twice for each enqueued item. -* [11921](https://github.com/grafana/loki/pull/11921) **paul1r**: Parsing: String array elements were not being parsed correctly in JSON processing +* [11921](https://github.com/grafana/loki/pull/11921) **paul1r**: Parsing: String array elements were not being parsed correctly in JSON processing ##### Changes diff --git a/cmd/loki/loki-local-with-memcached.yaml b/cmd/loki/loki-local-with-memcached.yaml index d1b0ae1c2493c..a2f4336cdd484 100644 --- a/cmd/loki/loki-local-with-memcached.yaml +++ b/cmd/loki/loki-local-with-memcached.yaml @@ -22,6 +22,17 @@ query_range: cache_results: true cache_volume_results: true cache_series_results: true + cache_instant_metric_results: true + instant_metric_query_split_align: true + instant_metric_results_cache: + cache: + default_validity: 12h + memcached_client: + consistent_hash: true + addresses: "dns+localhost:11211" + max_idle_conns: 16 + timeout: 500ms + update_interval: 1m series_results_cache: cache: default_validity: 12h diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index d3c5593b4da23..70891a0448419 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -886,6 +886,28 @@ volume_results_cache: # CLI flag: -frontend.volume-results-cache.compression [compression: | default = ""] +# Cache instant metric query results. +# CLI flag: -querier.cache-instant-metric-results +[cache_instant_metric_results: | default = false] + +# If a cache config is not specified and cache_instant_metric_results is true, +# the config for the results cache is used. +instant_metric_results_cache: + # The cache block configures the cache backend. + # The CLI flags prefix for this block configuration is: + # frontend.instant-metric-results-cache + [cache: ] + + # Use compression in cache. The default is an empty value '', which disables + # compression. Supported values are: 'snappy' and ''. + # CLI flag: -frontend.instant-metric-results-cache.compression + [compression: | default = ""] + +# Whether to align the splits of instant metric query with splitByInterval and +# query's exec time. Useful when instant_metric_cache is enabled +# CLI flag: -querier.instant-metric-query-split-align +[instant_metric_query_split_align: | default = false] + # Cache series query results. # CLI flag: -querier.cache-series-results [cache_series_results: | default = false] @@ -2935,6 +2957,13 @@ The `limits_config` block configures global and per-tenant limits in Loki. # CLI flag: -experimental.querier.recent-metadata-query-window [recent_metadata_query_window: | default = 0s] +# Split instant metric queries by a time interval and execute in parallel. The +# value 0 disables splitting instant metric queries by time. This also +# determines how cache keys are chosen when instant metric query result caching +# is enabled. +# CLI flag: -querier.split-instant-metric-queries-by-interval +[split_instant_metric_queries_by_interval: | default = 1h] + # Interval to use for time-based splitting when a request is within the # `query_ingesters_within` window; defaults to `split-queries-by-interval` by # setting to 0. @@ -4403,6 +4432,7 @@ The cache block configures the cache backend. The supported CLI flags `` - `bloom.metas-cache` - `frontend` - `frontend.index-stats-results-cache` +- `frontend.instant-metric-results-cache` - `frontend.label-results-cache` - `frontend.series-results-cache` - `frontend.volume-results-cache` diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index 33d945f11b923..6946c06e54a09 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -636,6 +636,10 @@ func NewResultStepEvaluator(res logqlmodel.Result, params Params) (StepEvaluator step = params.Step() ) + if res.Data == nil { + return nil, fmt.Errorf("data in the passed result is nil (res.Data), cannot be processed by stepevaluator") + } + switch data := res.Data.(type) { case promql.Vector: return NewVectorStepEvaluator(start, data), nil diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 40fbece82d87d..b55e9840a4758 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -94,7 +94,8 @@ func RecordRangeAndInstantQueryMetrics( ) { var ( logger = fixLogger(ctx, log) - rt = string(GetRangeType(p)) + rangeType = GetRangeType(p) + rt = string(rangeType) latencyType = latencyTypeFast returnedLines = 0 ) @@ -103,6 +104,12 @@ func RecordRangeAndInstantQueryMetrics( level.Warn(logger).Log("msg", "error parsing query type", "err", err) } + resultCache := stats.Caches.Result + + if queryType == QueryTypeMetric && rangeType == InstantType { + resultCache = stats.Caches.InstantMetricResult + } + // Tag throughput metric by latency type based on a threshold. // Latency below the threshold is fast, above is slow. if stats.Summary.ExecTime > slowQueryThresholdSecond { @@ -162,10 +169,10 @@ func RecordRangeAndInstantQueryMetrics( "cache_volume_results_req", stats.Caches.VolumeResult.EntriesRequested, "cache_volume_results_hit", stats.Caches.VolumeResult.EntriesFound, "cache_volume_results_download_time", stats.Caches.VolumeResult.CacheDownloadTime(), - "cache_result_req", stats.Caches.Result.EntriesRequested, - "cache_result_hit", stats.Caches.Result.EntriesFound, - "cache_result_download_time", stats.Caches.Result.CacheDownloadTime(), - "cache_result_query_length_served", stats.Caches.Result.CacheQueryLengthServed(), + "cache_result_req", resultCache.EntriesRequested, + "cache_result_hit", resultCache.EntriesFound, + "cache_result_download_time", resultCache.CacheDownloadTime(), + "cache_result_query_length_served", resultCache.CacheQueryLengthServed(), }...) logValues = append(logValues, tagsToKeyValues(queryTags)...) diff --git a/pkg/logql/rangemapper.go b/pkg/logql/rangemapper.go index 975f63f4c9523..14cf76f1475a5 100644 --- a/pkg/logql/rangemapper.go +++ b/pkg/logql/rangemapper.go @@ -57,6 +57,20 @@ type RangeMapper struct { splitByInterval time.Duration metrics *MapperMetrics stats *MapperStats + + splitAlignTs time.Time +} + +// NewRangeMapperWithSplitAlign is similar to `NewRangeMapper` except it accepts additonal `splitAlign` argument and used to +// align the subqueries generated according to that. Look at `rangeSplitAlign` method for more information. +func NewRangeMapperWithSplitAlign(interval time.Duration, splitAlign time.Time, metrics *MapperMetrics, stats *MapperStats) (RangeMapper, error) { + rm, err := NewRangeMapper(interval, metrics, stats) + if err != nil { + return RangeMapper{}, err + } + rm.splitAlignTs = splitAlign + + return rm, nil } // NewRangeMapper creates a new RangeMapper instance with the given duration as @@ -327,6 +341,77 @@ func (m RangeMapper) getOriginalOffset(expr syntax.SampleExpr) (offset time.Dura // rangeInterval should be greater than m.splitByInterval, otherwise the resultant expression // will have an unnecessary aggregation operation func (m RangeMapper) mapConcatSampleExpr(expr syntax.SampleExpr, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr { + if m.splitAlignTs.IsZero() { + return m.rangeSplit(expr, rangeInterval, recorder) + } + return m.rangeSplitAlign(expr, rangeInterval, recorder) +} + +// rangeSplitAlign try to split given `rangeInterval` into units of `m.splitByInterval` by making sure `rangeInterval` is aligned with `m.splitByInterval` for as much as the units as possible. +// Consider following example with real use case. +// Instant Query: `sum(rate({foo="bar"}[3h])` +// execTs: 12:34:00 +// splitBy: 1h +// Given above parameters, queries will be split into following +// 1. sum(rate({foo="bar"}[34m])) +// 2. sum(rate({foo="bar"}[1h] offset 34m)) +// 3. sum(rate({foo="bar"}[1h] offset 1h34m)) +// 4. sum(rate({foo="bar"}[26m] offset 2h34m)) +func (m RangeMapper) rangeSplitAlign( + expr syntax.SampleExpr, rangeInterval time.Duration, recorder *downstreamRecorder, +) syntax.SampleExpr { + if rangeInterval <= m.splitByInterval { + return expr + } + + originalOffset, err := m.getOriginalOffset(expr) + if err != nil { + return expr + } + + align := m.splitAlignTs.Sub(m.splitAlignTs.Truncate(m.splitByInterval)) // say, 12:34:00 - 12:00:00(truncated) = 34m + + if align == 0 { + return m.rangeSplit(expr, rangeInterval, recorder) // Don't have to align + } + + var ( + newRng = align + + // TODO(kavi): If the originalOffset is non-zero, there may be a edge case, where subqueries generated won't be aligned correctly. Handle this edge case in separate PR. + newOffset = originalOffset + downstreams *ConcatSampleExpr + pendingRangeInterval = rangeInterval + splits = 0 + ) + + // first subquery + downstreams = appendDownstream(downstreams, expr, newRng, newOffset) + splits++ + + newOffset += align // e.g: offset 34m + pendingRangeInterval -= newRng + newRng = m.splitByInterval // [1h] + + // Rest of the subqueries. + for pendingRangeInterval > 0 { + if pendingRangeInterval < m.splitByInterval { + newRng = pendingRangeInterval // last subquery + } + downstreams = appendDownstream(downstreams, expr, newRng, newOffset) + newOffset += m.splitByInterval + pendingRangeInterval -= newRng + splits++ + } + + // update stats and metrics + m.stats.AddSplitQueries(splits) + recorder.Add(splits, MetricsKey) + + return downstreams +} + +func (m RangeMapper) rangeSplit(expr syntax.SampleExpr, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr { splitCount := int(math.Ceil(float64(rangeInterval) / float64(m.splitByInterval))) if splitCount <= 1 { return expr diff --git a/pkg/logql/rangemapper_test.go b/pkg/logql/rangemapper_test.go index 562ac0cd168e9..5e95486a8c8e2 100644 --- a/pkg/logql/rangemapper_test.go +++ b/pkg/logql/rangemapper_test.go @@ -93,6 +93,84 @@ func Test_SplitRangeInterval(t *testing.T) { } } +func Test_RangeMapperSplitAlign(t *testing.T) { + cases := []struct { + name string + expr string + queryTime time.Time + splityByInterval time.Duration + expected string + expectedSplits int + }{ + { + name: "query_time_aligned_with_split_by", + expr: `bytes_over_time({app="foo"}[3m])`, + expected: `sum without() ( + downstream> + ++ downstream> + ++ downstream> + )`, + queryTime: time.Unix(60, 0), // 1970 00:01:00 + splityByInterval: 1 * time.Minute, + expectedSplits: 3, + }, + { + name: "query_time_aligned_with_split_by_with_original_offset", + expr: `bytes_over_time({app="foo"}[3m] offset 20m10s)`, // NOTE: original query has offset, which should be considered in all the splits subquery + expected: `sum without() ( + downstream> + ++ downstream> + ++ downstream> + )`, + queryTime: time.Unix(60, 0), // 1970 00:01:00 + splityByInterval: 1 * time.Minute, + expectedSplits: 3, + }, + { + name: "query_time_not_aligned_with_split_by", + expr: `bytes_over_time({app="foo"}[3h])`, + expected: `sum without() ( + downstream> + ++ downstream> + ++ downstream> + ++ downstream> + )`, + queryTime: time.Date(0, 0, 0, 12, 54, 0, 0, time.UTC), // 1970 12:54:00 + splityByInterval: 1 * time.Hour, + expectedSplits: 4, + }, + { + name: "query_time_not_aligned_with_split_by_with_original_offset", + expr: `bytes_over_time({app="foo"}[3h] offset 1h2m20s)`, // NOTE: original query has offset, which should be considered in all the splits subquery + expected: `sum without() ( + downstream> + ++ downstream> + ++ downstream> + ++ downstream> + )`, + queryTime: time.Date(0, 0, 0, 12, 54, 0, 0, time.UTC), // 1970 12:54:00 + splityByInterval: 1 * time.Hour, + expectedSplits: 4, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + mapperStats := NewMapperStats() + rvm, err := NewRangeMapperWithSplitAlign(tc.splityByInterval, tc.queryTime, nilShardMetrics, mapperStats) + require.NoError(t, err) + + noop, mappedExpr, err := rvm.Parse(syntax.MustParseExpr(tc.expr)) + require.NoError(t, err) + + require.Equal(t, removeWhiteSpace(tc.expected), removeWhiteSpace(mappedExpr.String())) + require.Equal(t, tc.expectedSplits, mapperStats.GetSplitQueries()) + require.False(t, noop) + + }) + } +} + func Test_SplitRangeVectorMapping(t *testing.T) { for _, tc := range []struct { expr string @@ -1675,7 +1753,7 @@ func Test_SplitRangeVectorMapping(t *testing.T) { // Non-splittable vector aggregators - should go deeper in the AST { `topk(2, count_over_time({app="foo"}[3m]))`, - `topk(2, + `topk(2, sum without () ( downstream> ++ downstream> @@ -1713,7 +1791,7 @@ func Test_SplitRangeVectorMapping(t *testing.T) { ++ downstream> ++ downstream> ) - ), + ), "x", "$1", "a", "(.*)" )`, 3, @@ -1727,7 +1805,7 @@ func Test_SplitRangeVectorMapping(t *testing.T) { ++ downstream> ++ downstream> ) - / 180), + / 180), "foo", "$1", "service", "(.*):.*" )`, 3, diff --git a/pkg/logqlmodel/stats/context.go b/pkg/logqlmodel/stats/context.go index 4fbddc790b8b2..41a96ca24c75a 100644 --- a/pkg/logqlmodel/stats/context.go +++ b/pkg/logqlmodel/stats/context.go @@ -55,17 +55,18 @@ type Context struct { type CacheType string const ( - ChunkCache CacheType = "chunk" //nolint:staticcheck - IndexCache CacheType = "index" //nolint:staticcheck - ResultCache CacheType = "result" //nolint:staticcheck - StatsResultCache CacheType = "stats-result" //nolint:staticcheck - VolumeResultCache CacheType = "volume-result" //nolint:staticcheck - WriteDedupeCache CacheType = "write-dedupe" //nolint:staticcheck - SeriesResultCache CacheType = "series-result" //nolint:staticcheck - LabelResultCache CacheType = "label-result" //nolint:staticcheck - BloomFilterCache CacheType = "bloom-filter" //nolint:staticcheck - BloomBlocksCache CacheType = "bloom-blocks" //nolint:staticcheck - BloomMetasCache CacheType = "bloom-metas" //nolint:staticcheck + ChunkCache CacheType = "chunk" //nolint:staticcheck + IndexCache CacheType = "index" //nolint:staticcheck + ResultCache CacheType = "result" //nolint:staticcheck + StatsResultCache CacheType = "stats-result" //nolint:staticcheck + VolumeResultCache CacheType = "volume-result" //nolint:staticcheck + InstantMetricResultsCache CacheType = "instant-metric-result" // nolint:staticcheck + WriteDedupeCache CacheType = "write-dedupe" //nolint:staticcheck + SeriesResultCache CacheType = "series-result" //nolint:staticcheck + LabelResultCache CacheType = "label-result" //nolint:staticcheck + BloomFilterCache CacheType = "bloom-filter" //nolint:staticcheck + BloomBlocksCache CacheType = "bloom-blocks" //nolint:staticcheck + BloomMetasCache CacheType = "bloom-metas" //nolint:staticcheck ) // NewContext creates a new statistics context @@ -98,13 +99,14 @@ func (c *Context) Ingester() Ingester { // Caches returns the cache statistics accumulated so far. func (c *Context) Caches() Caches { return Caches{ - Chunk: c.caches.Chunk, - Index: c.caches.Index, - Result: c.caches.Result, - StatsResult: c.caches.StatsResult, - VolumeResult: c.caches.VolumeResult, - SeriesResult: c.caches.SeriesResult, - LabelResult: c.caches.LabelResult, + Chunk: c.caches.Chunk, + Index: c.caches.Index, + Result: c.caches.Result, + StatsResult: c.caches.StatsResult, + VolumeResult: c.caches.VolumeResult, + SeriesResult: c.caches.SeriesResult, + LabelResult: c.caches.LabelResult, + InstantMetricResult: c.caches.InstantMetricResult, } } @@ -222,6 +224,7 @@ func (c *Caches) Merge(m Caches) { c.VolumeResult.Merge(m.VolumeResult) c.SeriesResult.Merge(m.SeriesResult) c.LabelResult.Merge(m.LabelResult) + c.InstantMetricResult.Merge(m.InstantMetricResult) } func (c *Cache) Merge(m Cache) { @@ -470,6 +473,8 @@ func (c *Context) getCacheStatsByType(t CacheType) *Cache { stats = &c.caches.SeriesResult case LabelResultCache: stats = &c.caches.LabelResult + case InstantMetricResultsCache: + stats = &c.caches.InstantMetricResult default: return nil } @@ -571,6 +576,12 @@ func (c Caches) Log(log log.Logger) { "Cache.Result.EntriesStored", c.Result.EntriesStored, "Cache.Result.BytesSent", humanize.Bytes(uint64(c.Result.BytesSent)), "Cache.Result.BytesReceived", humanize.Bytes(uint64(c.Result.BytesReceived)), - "Cache.Result.DownloadTime", c.Result.CacheDownloadTime(), + "Cache.InstantMetricResult.Requests", c.InstantMetricResult.Requests, + "Cache.InstantMetricResult.EntriesRequested", c.InstantMetricResult.EntriesRequested, + "Cache.InstantMetricResult.EntriesFound", c.InstantMetricResult.EntriesFound, + "Cache.InstantMetricResult.EntriesStored", c.InstantMetricResult.EntriesStored, + "Cache.InstantMetricResult.BytesSent", humanize.Bytes(uint64(c.InstantMetricResult.BytesSent)), + "Cache.InstantMetricResult.BytesReceived", humanize.Bytes(uint64(c.InstantMetricResult.BytesReceived)), + "Cache.InstantMetricResult.DownloadTime", c.InstantMetricResult.CacheDownloadTime(), ) } diff --git a/pkg/logqlmodel/stats/stats.pb.go b/pkg/logqlmodel/stats/stats.pb.go index 75be704020c97..65f8f0f642381 100644 --- a/pkg/logqlmodel/stats/stats.pb.go +++ b/pkg/logqlmodel/stats/stats.pb.go @@ -95,13 +95,14 @@ func (m *Result) GetCaches() Caches { } type Caches struct { - Chunk Cache `protobuf:"bytes,1,opt,name=chunk,proto3" json:"chunk"` - Index Cache `protobuf:"bytes,2,opt,name=index,proto3" json:"index"` - Result Cache `protobuf:"bytes,3,opt,name=result,proto3" json:"result"` - StatsResult Cache `protobuf:"bytes,4,opt,name=statsResult,proto3" json:"statsResult"` - VolumeResult Cache `protobuf:"bytes,5,opt,name=volumeResult,proto3" json:"volumeResult"` - SeriesResult Cache `protobuf:"bytes,6,opt,name=seriesResult,proto3" json:"seriesResult"` - LabelResult Cache `protobuf:"bytes,7,opt,name=labelResult,proto3" json:"labelResult"` + Chunk Cache `protobuf:"bytes,1,opt,name=chunk,proto3" json:"chunk"` + Index Cache `protobuf:"bytes,2,opt,name=index,proto3" json:"index"` + Result Cache `protobuf:"bytes,3,opt,name=result,proto3" json:"result"` + StatsResult Cache `protobuf:"bytes,4,opt,name=statsResult,proto3" json:"statsResult"` + VolumeResult Cache `protobuf:"bytes,5,opt,name=volumeResult,proto3" json:"volumeResult"` + SeriesResult Cache `protobuf:"bytes,6,opt,name=seriesResult,proto3" json:"seriesResult"` + LabelResult Cache `protobuf:"bytes,7,opt,name=labelResult,proto3" json:"labelResult"` + InstantMetricResult Cache `protobuf:"bytes,8,opt,name=instantMetricResult,proto3" json:"instantMetricResult"` } func (m *Caches) Reset() { *m = Caches{} } @@ -185,6 +186,13 @@ func (m *Caches) GetLabelResult() Cache { return Cache{} } +func (m *Caches) GetInstantMetricResult() Cache { + if m != nil { + return m.InstantMetricResult + } + return Cache{} +} + // Summary is the summary of a query statistics. type Summary struct { // Total bytes processed per second. @@ -773,83 +781,85 @@ func init() { func init() { proto.RegisterFile("pkg/logqlmodel/stats/stats.proto", fileDescriptor_6cdfe5d2aea33ebb) } var fileDescriptor_6cdfe5d2aea33ebb = []byte{ - // 1215 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x57, 0x4d, 0x6f, 0xe3, 0x54, - 0x17, 0x8e, 0x27, 0xaf, 0x93, 0xce, 0xed, 0xe7, 0xdc, 0x76, 0xde, 0xc9, 0x80, 0x64, 0x97, 0xc0, - 0x88, 0x22, 0x50, 0x23, 0x3e, 0x24, 0x04, 0x62, 0x24, 0xe4, 0x0e, 0x95, 0x2a, 0x75, 0x44, 0x39, - 0x81, 0x0d, 0x3b, 0xc7, 0xbe, 0x4d, 0xa2, 0x3a, 0x76, 0x6a, 0x5f, 0x97, 0xe9, 0x0a, 0x7e, 0x02, - 0x3f, 0x83, 0x0d, 0x2b, 0x56, 0x48, 0x88, 0x0d, 0x9b, 0x59, 0x76, 0x39, 0x2b, 0x8b, 0xa6, 0x1b, - 0xe4, 0xd5, 0x48, 0xfc, 0x01, 0x74, 0xcf, 0xbd, 0xf1, 0x57, 0x9c, 0x99, 0x6e, 0xe2, 0x7b, 0x9e, - 0xf3, 0x3c, 0xe7, 0x7e, 0x9e, 0x73, 0x6f, 0xc8, 0xee, 0xf4, 0x6c, 0xd8, 0xf3, 0x82, 0xe1, 0xb9, - 0x37, 0x09, 0x5c, 0xe6, 0xf5, 0x22, 0x6e, 0xf3, 0x48, 0xfe, 0xee, 0x4f, 0xc3, 0x80, 0x07, 0x54, - 0x47, 0xe3, 0x8d, 0x9d, 0x61, 0x30, 0x0c, 0x10, 0xe9, 0x89, 0x96, 0x74, 0x76, 0xff, 0xd5, 0x48, - 0x0b, 0x58, 0x14, 0x7b, 0x9c, 0x7e, 0x46, 0xda, 0x51, 0x3c, 0x99, 0xd8, 0xe1, 0x65, 0x47, 0xdb, - 0xd5, 0xf6, 0x56, 0x3f, 0xda, 0xd8, 0x97, 0x61, 0xfa, 0x12, 0xb5, 0x36, 0x9f, 0x27, 0x66, 0x23, - 0x4d, 0xcc, 0x39, 0x0d, 0xe6, 0x0d, 0x21, 0x3d, 0x8f, 0x59, 0x38, 0x66, 0x61, 0xe7, 0x4e, 0x49, - 0xfa, 0x8d, 0x44, 0x73, 0xa9, 0xa2, 0xc1, 0xbc, 0x41, 0x1f, 0x93, 0x95, 0xb1, 0x3f, 0x64, 0x11, - 0x67, 0x61, 0xa7, 0x89, 0xda, 0x4d, 0xa5, 0x3d, 0x52, 0xb0, 0xb5, 0xa5, 0xc4, 0x19, 0x11, 0xb2, - 0x16, 0xfd, 0x84, 0xb4, 0x1c, 0xdb, 0x19, 0xb1, 0xa8, 0xf3, 0x3f, 0x14, 0xaf, 0x2b, 0xf1, 0x01, - 0x82, 0xd6, 0xba, 0x92, 0xea, 0x48, 0x02, 0xc5, 0xed, 0xfe, 0xd9, 0x24, 0x2d, 0xc9, 0xa0, 0x1f, - 0x12, 0xdd, 0x19, 0xc5, 0xfe, 0x99, 0x9a, 0xf3, 0x5a, 0x51, 0x5f, 0x90, 0x0b, 0x0a, 0xc8, 0x8f, - 0x90, 0x8c, 0x7d, 0x97, 0x3d, 0x53, 0x73, 0x5d, 0x22, 0x41, 0x0a, 0xc8, 0x8f, 0x18, 0x66, 0x88, - 0xab, 0xac, 0xe6, 0x58, 0xd6, 0x6c, 0x28, 0x8d, 0xe2, 0x80, 0xfa, 0xd2, 0x03, 0xb2, 0x8a, 0x34, - 0xb9, 0x41, 0x6a, 0x86, 0x65, 0xe9, 0xb6, 0x92, 0x16, 0x89, 0x50, 0x34, 0xe8, 0x21, 0x59, 0xbb, - 0x08, 0xbc, 0x78, 0xc2, 0x54, 0x14, 0xbd, 0x26, 0xca, 0x8e, 0x8a, 0x52, 0x62, 0x42, 0xc9, 0x12, - 0x71, 0x22, 0xb1, 0x65, 0xf3, 0xd1, 0xb4, 0x5e, 0x15, 0xa7, 0xc8, 0x84, 0x92, 0x25, 0x26, 0xe5, - 0xd9, 0x03, 0xe6, 0xa9, 0x30, 0xed, 0x57, 0x4d, 0xaa, 0x40, 0x84, 0xa2, 0xd1, 0xfd, 0xbd, 0x45, - 0xda, 0xea, 0x58, 0xd2, 0xef, 0xc8, 0x83, 0xc1, 0x25, 0x67, 0xd1, 0x49, 0x18, 0x38, 0x2c, 0x8a, - 0x98, 0x7b, 0xc2, 0xc2, 0x3e, 0x73, 0x02, 0xdf, 0xc5, 0x3d, 0x6d, 0x5a, 0x6f, 0xa6, 0x89, 0xb9, - 0x8c, 0x02, 0xcb, 0x1c, 0x22, 0xac, 0x37, 0xf6, 0x6b, 0xc3, 0xde, 0xc9, 0xc3, 0x2e, 0xa1, 0xc0, - 0x32, 0x07, 0x3d, 0x22, 0xdb, 0x3c, 0xe0, 0xb6, 0x67, 0x95, 0xba, 0xc5, 0x63, 0xd1, 0xb4, 0x1e, - 0xa4, 0x89, 0x59, 0xe7, 0x86, 0x3a, 0x30, 0x0b, 0x75, 0x5c, 0xea, 0x0a, 0x8f, 0x49, 0x31, 0x54, - 0xd9, 0x0d, 0x75, 0x20, 0xdd, 0x23, 0x2b, 0xec, 0x19, 0x73, 0xbe, 0x1d, 0x4f, 0x18, 0x1e, 0x10, - 0xcd, 0x5a, 0x13, 0x09, 0x37, 0xc7, 0x20, 0x6b, 0xd1, 0xf7, 0xc9, 0xdd, 0xf3, 0x98, 0xc5, 0x0c, - 0xa9, 0x2d, 0xa4, 0xae, 0xa7, 0x89, 0x99, 0x83, 0x90, 0x37, 0xe9, 0x3e, 0x21, 0x51, 0x3c, 0x90, - 0xa9, 0x1e, 0xe1, 0x56, 0x37, 0xad, 0x8d, 0x34, 0x31, 0x0b, 0x28, 0x14, 0xda, 0xf4, 0x98, 0xec, - 0xe0, 0xe8, 0xbe, 0xf2, 0xb9, 0x3c, 0x31, 0x3c, 0x0e, 0x7d, 0xe6, 0x76, 0x56, 0x50, 0xd9, 0x49, - 0x13, 0xb3, 0xd6, 0x0f, 0xb5, 0x28, 0xed, 0x92, 0x56, 0x34, 0xf5, 0xc6, 0x3c, 0xea, 0xdc, 0x45, - 0x3d, 0x11, 0x29, 0x26, 0x11, 0x50, 0x5f, 0xe4, 0x8c, 0xec, 0xd0, 0x8d, 0x3a, 0xa4, 0xc0, 0x41, - 0x04, 0xd4, 0x37, 0x1b, 0xd5, 0x49, 0x10, 0xf1, 0xc3, 0xb1, 0xc7, 0x59, 0x88, 0xab, 0xd7, 0x59, - 0xad, 0x8c, 0xaa, 0xe2, 0x87, 0x5a, 0x94, 0xfe, 0x48, 0x1e, 0x21, 0xde, 0xe7, 0x61, 0xec, 0xf0, - 0x38, 0x64, 0xee, 0x53, 0xc6, 0x6d, 0xd7, 0xe6, 0x76, 0xe5, 0x48, 0xac, 0x61, 0xf8, 0xf7, 0xd2, - 0xc4, 0xbc, 0x9d, 0x00, 0x6e, 0x47, 0xeb, 0x7e, 0x41, 0xda, 0xaa, 0x2c, 0x8b, 0x4a, 0x16, 0xf1, - 0x20, 0x64, 0x95, 0xe2, 0xd7, 0x17, 0x58, 0x5e, 0xc9, 0x90, 0x02, 0xf2, 0xd3, 0xfd, 0xf5, 0x0e, - 0x59, 0x39, 0xca, 0xab, 0xef, 0x1a, 0xf6, 0x09, 0x4c, 0xe4, 0xad, 0xcc, 0x37, 0xdd, 0xda, 0x12, - 0x15, 0xa0, 0x88, 0x43, 0xc9, 0xa2, 0x87, 0x84, 0xa2, 0x7d, 0x20, 0xaa, 0x69, 0xf4, 0xd4, 0xe6, - 0xa8, 0x95, 0x49, 0xf5, 0xff, 0x34, 0x31, 0x6b, 0xbc, 0x50, 0x83, 0x65, 0xbd, 0x5b, 0x68, 0x47, - 0x2a, 0x87, 0xf2, 0xde, 0x15, 0x0e, 0x25, 0x8b, 0x7e, 0x4e, 0x36, 0xf2, 0x0c, 0xe8, 0x33, 0x9f, - 0xab, 0x84, 0xa1, 0x69, 0x62, 0x56, 0x3c, 0x50, 0xb1, 0xf3, 0xf5, 0xd2, 0x6f, 0xbd, 0x5e, 0x7f, - 0x34, 0x89, 0x8e, 0xfe, 0xac, 0x63, 0x39, 0x09, 0x60, 0xa7, 0xaa, 0x3c, 0xe5, 0x1d, 0x67, 0x1e, - 0xa8, 0xd8, 0xf4, 0x6b, 0x72, 0xbf, 0x80, 0x3c, 0x09, 0x7e, 0xf0, 0xbd, 0xc0, 0x76, 0xb3, 0x55, - 0x7b, 0x98, 0x26, 0x66, 0x3d, 0x01, 0xea, 0x61, 0xb1, 0x07, 0x4e, 0x09, 0xc3, 0x7c, 0x6e, 0xe6, - 0x7b, 0xb0, 0xe8, 0x85, 0x1a, 0x8c, 0x3a, 0xe4, 0xa1, 0x48, 0xde, 0x4b, 0x60, 0xa7, 0x2c, 0x64, - 0xbe, 0xc3, 0xdc, 0xfc, 0xfc, 0x75, 0xd6, 0x77, 0xb5, 0xbd, 0x15, 0xeb, 0x51, 0x9a, 0x98, 0x6f, - 0x2d, 0x25, 0xcd, 0x0f, 0x29, 0x2c, 0x8f, 0x93, 0xdf, 0xd1, 0x95, 0x1b, 0x50, 0x60, 0x4b, 0xee, - 0xe8, 0xf9, 0xfc, 0x80, 0x9d, 0x46, 0x87, 0x8c, 0x3b, 0xa3, 0xac, 0xb4, 0x15, 0xe7, 0x57, 0xf2, - 0x42, 0x0d, 0xd6, 0xfd, 0x4d, 0x27, 0x3a, 0xf6, 0x23, 0xb6, 0x6f, 0xc4, 0x6c, 0x57, 0x76, 0x2a, - 0x32, 0xaa, 0x78, 0x6e, 0xca, 0x1e, 0xa8, 0xd8, 0x25, 0xad, 0xac, 0x1d, 0x7a, 0x8d, 0x56, 0x56, - 0x8d, 0x8a, 0x4d, 0x0f, 0xc8, 0x3d, 0x97, 0x39, 0xc1, 0x64, 0x1a, 0x62, 0xfa, 0xca, 0xae, 0x5b, - 0x28, 0xbf, 0x9f, 0x26, 0xe6, 0xa2, 0x13, 0x16, 0xa1, 0x6a, 0x10, 0x39, 0x86, 0x76, 0x7d, 0x10, - 0x39, 0x8c, 0x45, 0x88, 0x3e, 0x26, 0x9b, 0xd5, 0x71, 0xc8, 0xc2, 0xbc, 0x9d, 0x26, 0x66, 0xd5, - 0x05, 0x55, 0x40, 0xc8, 0xf1, 0x2c, 0x3e, 0x89, 0xa7, 0xde, 0xd8, 0xb1, 0x85, 0xfc, 0x6e, 0x2e, - 0xaf, 0xb8, 0xa0, 0x0a, 0x08, 0xf9, 0xb4, 0x52, 0x80, 0x49, 0x2e, 0xaf, 0xb8, 0xa0, 0x0a, 0xd0, - 0x29, 0xd9, 0xcd, 0x16, 0x76, 0x49, 0x89, 0x54, 0x05, 0xfd, 0x9d, 0x34, 0x31, 0x5f, 0xcb, 0x85, - 0xd7, 0x32, 0xe8, 0x25, 0x79, 0xbb, 0xb8, 0x86, 0xcb, 0x3a, 0x95, 0x65, 0xfe, 0xdd, 0x34, 0x31, - 0x6f, 0x43, 0x87, 0xdb, 0x90, 0xba, 0x7f, 0x35, 0x89, 0x8e, 0x4f, 0x29, 0x51, 0x23, 0x99, 0xbc, - 0x16, 0x0f, 0x83, 0xd8, 0x2f, 0x55, 0xe8, 0x22, 0x0e, 0x25, 0x8b, 0x7e, 0x49, 0xb6, 0xd8, 0xfc, - 0x32, 0x3d, 0x8f, 0x45, 0xad, 0x97, 0x95, 0x46, 0xb7, 0x76, 0xd2, 0xc4, 0x5c, 0xf0, 0xc1, 0x02, - 0x42, 0x3f, 0x25, 0xeb, 0x0a, 0xc3, 0xe2, 0x27, 0x1f, 0x38, 0xba, 0x75, 0x2f, 0x4d, 0xcc, 0xb2, - 0x03, 0xca, 0xa6, 0x10, 0xe2, 0x8b, 0x0c, 0x98, 0xc3, 0xc6, 0x17, 0xd9, 0x73, 0x06, 0x85, 0x25, - 0x07, 0x94, 0x4d, 0xf1, 0x30, 0x41, 0x00, 0x4b, 0xba, 0x4c, 0x2f, 0x7c, 0x98, 0x64, 0x20, 0xe4, - 0x4d, 0xf1, 0xde, 0x09, 0xe5, 0x58, 0x65, 0x2e, 0xe9, 0xf2, 0xbd, 0x33, 0xc7, 0x20, 0x6b, 0x89, - 0x05, 0x74, 0x8b, 0x25, 0xb2, 0x9d, 0x5f, 0x32, 0x45, 0x1c, 0x4a, 0x96, 0xc8, 0x37, 0x2c, 0x67, - 0xc7, 0xcc, 0x1f, 0xf2, 0x51, 0x9f, 0x85, 0x17, 0xd9, 0x2b, 0x06, 0xf3, 0x6d, 0xc1, 0x09, 0x8b, - 0x90, 0x35, 0xb8, 0xba, 0x36, 0x1a, 0x2f, 0xae, 0x8d, 0xc6, 0xcb, 0x6b, 0x43, 0xfb, 0x69, 0x66, - 0x68, 0xbf, 0xcc, 0x0c, 0xed, 0xf9, 0xcc, 0xd0, 0xae, 0x66, 0x86, 0xf6, 0xf7, 0xcc, 0xd0, 0xfe, - 0x99, 0x19, 0x8d, 0x97, 0x33, 0x43, 0xfb, 0xf9, 0xc6, 0x68, 0x5c, 0xdd, 0x18, 0x8d, 0x17, 0x37, - 0x46, 0xe3, 0xfb, 0x0f, 0x86, 0x63, 0x3e, 0x8a, 0x07, 0xfb, 0x4e, 0x30, 0xe9, 0x0d, 0x43, 0xfb, - 0xd4, 0xf6, 0xed, 0x9e, 0x17, 0x9c, 0x8d, 0x7b, 0x75, 0x7f, 0x14, 0x07, 0x2d, 0xfc, 0x1b, 0xf8, - 0xf1, 0x7f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xa8, 0xe8, 0xef, 0xe7, 0x47, 0x0e, 0x00, 0x00, + // 1241 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x57, 0x4b, 0x6f, 0xe3, 0x54, + 0x14, 0x8e, 0x27, 0xe3, 0xa4, 0xbd, 0x7d, 0xce, 0x6d, 0x87, 0xc9, 0x30, 0x92, 0x5d, 0x02, 0x23, + 0x8a, 0x40, 0x8d, 0x78, 0x48, 0x08, 0xc4, 0x48, 0xc8, 0x1d, 0x2a, 0x55, 0x6a, 0x45, 0x39, 0x81, + 0x0d, 0xac, 0x1c, 0xfb, 0x36, 0xb1, 0xea, 0xd8, 0xa9, 0x7d, 0x5d, 0xa6, 0x2b, 0xf8, 0x09, 0xec, + 0xf9, 0x03, 0x6c, 0x58, 0xb1, 0x42, 0x62, 0xc7, 0x66, 0x96, 0x5d, 0xce, 0xca, 0xa2, 0xe9, 0x06, + 0x79, 0x35, 0x12, 0x7f, 0x00, 0xdd, 0x47, 0x6c, 0x5f, 0xc7, 0x99, 0xe9, 0x26, 0xbe, 0xe7, 0x3b, + 0xdf, 0x77, 0xee, 0xc3, 0xe7, 0x1c, 0xdf, 0xa0, 0x9d, 0xc9, 0xd9, 0xb0, 0xe7, 0x87, 0xc3, 0x73, + 0x7f, 0x1c, 0xba, 0xc4, 0xef, 0xc5, 0xd4, 0xa6, 0xb1, 0xf8, 0xdd, 0x9b, 0x44, 0x21, 0x0d, 0xb1, + 0xce, 0x8d, 0x37, 0xb7, 0x87, 0xe1, 0x30, 0xe4, 0x48, 0x8f, 0x8d, 0x84, 0xb3, 0xfb, 0x9f, 0x86, + 0x5a, 0x40, 0xe2, 0xc4, 0xa7, 0xf8, 0x33, 0xd4, 0x8e, 0x93, 0xf1, 0xd8, 0x8e, 0x2e, 0x3b, 0xda, + 0x8e, 0xb6, 0xbb, 0xf2, 0xd1, 0xfa, 0x9e, 0x08, 0xd3, 0x17, 0xa8, 0xb5, 0xf1, 0x3c, 0x35, 0x1b, + 0x59, 0x6a, 0xce, 0x68, 0x30, 0x1b, 0x30, 0xe9, 0x79, 0x42, 0x22, 0x8f, 0x44, 0x9d, 0x3b, 0x8a, + 0xf4, 0x1b, 0x81, 0x16, 0x52, 0x49, 0x83, 0xd9, 0x00, 0x3f, 0x41, 0x4b, 0x5e, 0x30, 0x24, 0x31, + 0x25, 0x51, 0xa7, 0xc9, 0xb5, 0x1b, 0x52, 0x7b, 0x28, 0x61, 0x6b, 0x53, 0x8a, 0x73, 0x22, 0xe4, + 0x23, 0xfc, 0x09, 0x6a, 0x39, 0xb6, 0x33, 0x22, 0x71, 0xe7, 0x2e, 0x17, 0xaf, 0x49, 0xf1, 0x3e, + 0x07, 0xad, 0x35, 0x29, 0xd5, 0x39, 0x09, 0x24, 0xb7, 0xfb, 0xeb, 0x5d, 0xd4, 0x12, 0x0c, 0xfc, + 0x21, 0xd2, 0x9d, 0x51, 0x12, 0x9c, 0xc9, 0x3d, 0xaf, 0x96, 0xf5, 0x25, 0x39, 0xa3, 0x80, 0x78, + 0x30, 0x89, 0x17, 0xb8, 0xe4, 0x99, 0xdc, 0xeb, 0x02, 0x09, 0xa7, 0x80, 0x78, 0xb0, 0x65, 0x46, + 0xfc, 0x94, 0xe5, 0x1e, 0x55, 0xcd, 0xba, 0xd4, 0x48, 0x0e, 0xc8, 0x27, 0xde, 0x47, 0x2b, 0x9c, + 0x26, 0x5e, 0x90, 0xdc, 0xa1, 0x2a, 0xdd, 0x92, 0xd2, 0x32, 0x11, 0xca, 0x06, 0x3e, 0x40, 0xab, + 0x17, 0xa1, 0x9f, 0x8c, 0x89, 0x8c, 0xa2, 0xd7, 0x44, 0xd9, 0x96, 0x51, 0x14, 0x26, 0x28, 0x16, + 0x8b, 0x13, 0xb3, 0x57, 0x36, 0x5b, 0x4d, 0xeb, 0x55, 0x71, 0xca, 0x4c, 0x50, 0x2c, 0xb6, 0x29, + 0xdf, 0x1e, 0x10, 0x5f, 0x86, 0x69, 0xbf, 0x6a, 0x53, 0x25, 0x22, 0x94, 0x0d, 0xfc, 0x03, 0xda, + 0xf2, 0x82, 0x98, 0xda, 0x01, 0x3d, 0x26, 0x34, 0xf2, 0x1c, 0x19, 0x6c, 0xa9, 0x26, 0xd8, 0x23, + 0x19, 0xac, 0x4e, 0x00, 0x75, 0x60, 0xf7, 0xcf, 0x16, 0x6a, 0xcb, 0x9c, 0xc7, 0xdf, 0xa1, 0x07, + 0x83, 0x4b, 0x4a, 0xe2, 0x93, 0x28, 0x74, 0x48, 0x1c, 0x13, 0xf7, 0x84, 0x44, 0x7d, 0xe2, 0x84, + 0x81, 0xcb, 0x13, 0xa6, 0x69, 0x3d, 0xca, 0x52, 0x73, 0x11, 0x05, 0x16, 0x39, 0x58, 0x58, 0xdf, + 0x0b, 0x6a, 0xc3, 0xde, 0x29, 0xc2, 0x2e, 0xa0, 0xc0, 0x22, 0x07, 0x3e, 0x44, 0x5b, 0x34, 0xa4, + 0xb6, 0x6f, 0x29, 0xd3, 0xf2, 0x9c, 0x6b, 0x5a, 0x0f, 0xd8, 0x21, 0xd4, 0xb8, 0xa1, 0x0e, 0xcc, + 0x43, 0x1d, 0x29, 0x53, 0xf1, 0x1c, 0x2c, 0x87, 0x52, 0xdd, 0x50, 0x07, 0xe2, 0x5d, 0xb4, 0x44, + 0x9e, 0x11, 0xe7, 0x5b, 0x6f, 0x4c, 0x78, 0xf6, 0x69, 0xd6, 0x2a, 0xab, 0xe6, 0x19, 0x06, 0xf9, + 0x08, 0xbf, 0x8f, 0x96, 0xcf, 0x13, 0x92, 0x10, 0x4e, 0x6d, 0x71, 0xea, 0x5a, 0x96, 0x9a, 0x05, + 0x08, 0xc5, 0x10, 0xef, 0x21, 0x14, 0x27, 0x03, 0xd1, 0x47, 0x62, 0x9e, 0x47, 0x4d, 0x6b, 0x3d, + 0x4b, 0xcd, 0x12, 0x0a, 0xa5, 0x31, 0x3e, 0x42, 0xdb, 0x7c, 0x75, 0x5f, 0x05, 0x54, 0xa4, 0x23, + 0x4d, 0xa2, 0x80, 0xb8, 0x3c, 0x69, 0x9a, 0x56, 0x27, 0x4b, 0xcd, 0x5a, 0x3f, 0xd4, 0xa2, 0xb8, + 0x8b, 0x5a, 0xf1, 0xc4, 0xf7, 0x68, 0xdc, 0x59, 0xe6, 0x7a, 0xc4, 0xea, 0x57, 0x20, 0x20, 0x9f, + 0x9c, 0x33, 0xb2, 0x23, 0x37, 0xee, 0xa0, 0x12, 0x87, 0x23, 0x20, 0x9f, 0xf9, 0xaa, 0x4e, 0xc2, + 0x98, 0x1e, 0x78, 0x3e, 0x25, 0x11, 0x3f, 0xbd, 0xce, 0x4a, 0x65, 0x55, 0x15, 0x3f, 0xd4, 0xa2, + 0xf8, 0x27, 0xf4, 0x98, 0xe3, 0x7d, 0x1a, 0x25, 0x0e, 0x4d, 0x22, 0xe2, 0x1e, 0x13, 0x6a, 0xbb, + 0x36, 0xb5, 0x2b, 0x29, 0xb1, 0xca, 0xc3, 0xbf, 0x97, 0xa5, 0xe6, 0xed, 0x04, 0x70, 0x3b, 0x5a, + 0xf7, 0x0b, 0xd4, 0x96, 0x3d, 0x9f, 0xb5, 0xc9, 0x98, 0x86, 0x11, 0xa9, 0x74, 0xd6, 0x3e, 0xc3, + 0x8a, 0x36, 0xc9, 0x29, 0x20, 0x1e, 0xdd, 0xdf, 0xef, 0xa0, 0xa5, 0xc3, 0xa2, 0xb5, 0xaf, 0xf2, + 0x39, 0x81, 0xb0, 0x3a, 0x16, 0xf5, 0xa6, 0x5b, 0x9b, 0xac, 0xbd, 0x94, 0x71, 0x50, 0x2c, 0x7c, + 0x80, 0x30, 0xb7, 0xf7, 0x59, 0xab, 0x8e, 0x8f, 0x6d, 0xca, 0xb5, 0xa2, 0xa8, 0xde, 0xc8, 0x52, + 0xb3, 0xc6, 0x0b, 0x35, 0x58, 0x3e, 0xbb, 0xc5, 0xed, 0x58, 0xd6, 0x50, 0x31, 0xbb, 0xc4, 0x41, + 0xb1, 0xf0, 0xe7, 0x68, 0xbd, 0xa8, 0x80, 0x3e, 0x09, 0xa8, 0x2c, 0x18, 0x9c, 0xa5, 0x66, 0xc5, + 0x03, 0x15, 0xbb, 0x38, 0x2f, 0xfd, 0xd6, 0xe7, 0xf5, 0x57, 0x13, 0xe9, 0xdc, 0x9f, 0x4f, 0x2c, + 0x36, 0x01, 0xe4, 0x54, 0xb6, 0xa7, 0x62, 0xe2, 0xdc, 0x03, 0x15, 0x1b, 0x7f, 0x8d, 0xee, 0x97, + 0x90, 0xa7, 0xe1, 0x8f, 0x81, 0x1f, 0xda, 0x6e, 0x7e, 0x6a, 0x0f, 0xb3, 0xd4, 0xac, 0x27, 0x40, + 0x3d, 0xcc, 0xde, 0x81, 0xa3, 0x60, 0xbc, 0x9e, 0x9b, 0xc5, 0x3b, 0x98, 0xf7, 0x42, 0x0d, 0x86, + 0x1d, 0xf4, 0x90, 0x15, 0xef, 0x25, 0x90, 0x53, 0x12, 0x91, 0xc0, 0x21, 0x6e, 0x91, 0x7f, 0x9d, + 0xb5, 0x1d, 0x6d, 0x77, 0xc9, 0x7a, 0x9c, 0xa5, 0xe6, 0x5b, 0x0b, 0x49, 0xb3, 0x24, 0x85, 0xc5, + 0x71, 0x8a, 0x0b, 0x40, 0xe5, 0xf3, 0xca, 0xb0, 0x05, 0x17, 0x80, 0xd9, 0xfe, 0x80, 0x9c, 0xc6, + 0x07, 0x84, 0x3a, 0xa3, 0xbc, 0xb5, 0x95, 0xf7, 0xa7, 0x78, 0xa1, 0x06, 0xeb, 0xfe, 0xa1, 0x23, + 0x9d, 0xcf, 0xc3, 0x5e, 0xdf, 0x88, 0xd8, 0xae, 0x98, 0x94, 0x55, 0x54, 0x39, 0x6f, 0x54, 0x0f, + 0x54, 0x6c, 0x45, 0x2b, 0x7a, 0x87, 0x5e, 0xa3, 0x15, 0x5d, 0xa3, 0x62, 0xe3, 0x7d, 0x74, 0xcf, + 0x25, 0x4e, 0x38, 0x9e, 0x44, 0xbc, 0x7c, 0xc5, 0xd4, 0x2d, 0x2e, 0xbf, 0x9f, 0xa5, 0xe6, 0xbc, + 0x13, 0xe6, 0xa1, 0x6a, 0x10, 0xb1, 0x86, 0x76, 0x7d, 0x10, 0xb1, 0x8c, 0x79, 0x08, 0x3f, 0x41, + 0x1b, 0xd5, 0x75, 0x88, 0xc6, 0xbc, 0x95, 0xa5, 0x66, 0xd5, 0x05, 0x55, 0x80, 0xc9, 0x79, 0x2e, + 0x3e, 0x4d, 0x26, 0xbe, 0xe7, 0xd8, 0x4c, 0xbe, 0x5c, 0xc8, 0x2b, 0x2e, 0xa8, 0x02, 0x4c, 0x3e, + 0xa9, 0x34, 0x60, 0x54, 0xc8, 0x2b, 0x2e, 0xa8, 0x02, 0x78, 0x82, 0x76, 0xf2, 0x83, 0x5d, 0xd0, + 0x22, 0x65, 0x43, 0x7f, 0x27, 0x4b, 0xcd, 0xd7, 0x72, 0xe1, 0xb5, 0x0c, 0x7c, 0x89, 0xde, 0x2e, + 0x9f, 0xe1, 0xa2, 0x49, 0x45, 0x9b, 0x7f, 0x37, 0x4b, 0xcd, 0xdb, 0xd0, 0xe1, 0x36, 0xa4, 0xee, + 0xdf, 0x4d, 0xa4, 0xf3, 0xab, 0x15, 0xeb, 0x91, 0x44, 0x7c, 0x16, 0x0f, 0xc2, 0x24, 0x50, 0x3a, + 0x74, 0x19, 0x07, 0xc5, 0xc2, 0x5f, 0xa2, 0x4d, 0x32, 0xfb, 0x98, 0x9e, 0x27, 0xac, 0xd7, 0x8b, + 0x4e, 0xa3, 0x5b, 0xdb, 0x59, 0x6a, 0xce, 0xf9, 0x60, 0x0e, 0xc1, 0x9f, 0xa2, 0x35, 0x89, 0xf1, + 0xe6, 0x27, 0x2e, 0x38, 0xba, 0x75, 0x2f, 0x4b, 0x4d, 0xd5, 0x01, 0xaa, 0xc9, 0x84, 0xfc, 0x46, + 0x06, 0xc4, 0x21, 0xde, 0x45, 0x7e, 0x9d, 0xe1, 0x42, 0xc5, 0x01, 0xaa, 0xc9, 0x2e, 0x26, 0x1c, + 0xe0, 0x2d, 0x5d, 0x94, 0x17, 0xbf, 0x98, 0xe4, 0x20, 0x14, 0x43, 0x76, 0xdf, 0x89, 0xc4, 0x5a, + 0x45, 0x2d, 0xe9, 0xe2, 0xbe, 0x33, 0xc3, 0x20, 0x1f, 0xb1, 0x03, 0x74, 0xcb, 0x2d, 0xb2, 0x5d, + 0x7c, 0x64, 0xca, 0x38, 0x28, 0x16, 0xab, 0x37, 0xde, 0xce, 0x8e, 0x48, 0x30, 0xa4, 0xa3, 0x3e, + 0x89, 0x2e, 0xf2, 0x5b, 0x0c, 0xaf, 0xb7, 0x39, 0x27, 0xcc, 0x43, 0xd6, 0xe0, 0xea, 0xda, 0x68, + 0xbc, 0xb8, 0x36, 0x1a, 0x2f, 0xaf, 0x0d, 0xed, 0xe7, 0xa9, 0xa1, 0xfd, 0x36, 0x35, 0xb4, 0xe7, + 0x53, 0x43, 0xbb, 0x9a, 0x1a, 0xda, 0x3f, 0x53, 0x43, 0xfb, 0x77, 0x6a, 0x34, 0x5e, 0x4e, 0x0d, + 0xed, 0x97, 0x1b, 0xa3, 0x71, 0x75, 0x63, 0x34, 0x5e, 0xdc, 0x18, 0x8d, 0xef, 0x3f, 0x18, 0x7a, + 0x74, 0x94, 0x0c, 0xf6, 0x9c, 0x70, 0xdc, 0x1b, 0x46, 0xf6, 0xa9, 0x1d, 0xd8, 0x3d, 0x3f, 0x3c, + 0xf3, 0x7a, 0x75, 0xff, 0x42, 0x07, 0x2d, 0xfe, 0x1f, 0xf3, 0xe3, 0xff, 0x03, 0x00, 0x00, 0xff, + 0xff, 0x38, 0x60, 0xd8, 0x7d, 0xa4, 0x0e, 0x00, 0x00, } func (this *Result) Equal(that interface{}) bool { @@ -925,6 +935,9 @@ func (this *Caches) Equal(that interface{}) bool { if !this.LabelResult.Equal(&that1.LabelResult) { return false } + if !this.InstantMetricResult.Equal(&that1.InstantMetricResult) { + return false + } return true } func (this *Summary) Equal(that interface{}) bool { @@ -1193,7 +1206,7 @@ func (this *Caches) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 11) + s := make([]string, 0, 12) s = append(s, "&stats.Caches{") s = append(s, "Chunk: "+strings.Replace(this.Chunk.GoString(), `&`, ``, 1)+",\n") s = append(s, "Index: "+strings.Replace(this.Index.GoString(), `&`, ``, 1)+",\n") @@ -1202,6 +1215,7 @@ func (this *Caches) GoString() string { s = append(s, "VolumeResult: "+strings.Replace(this.VolumeResult.GoString(), `&`, ``, 1)+",\n") s = append(s, "SeriesResult: "+strings.Replace(this.SeriesResult.GoString(), `&`, ``, 1)+",\n") s = append(s, "LabelResult: "+strings.Replace(this.LabelResult.GoString(), `&`, ``, 1)+",\n") + s = append(s, "InstantMetricResult: "+strings.Replace(this.InstantMetricResult.GoString(), `&`, ``, 1)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -1391,6 +1405,16 @@ func (m *Caches) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + { + size, err := m.InstantMetricResult.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintStats(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x42 { size, err := m.LabelResult.MarshalToSizedBuffer(dAtA[:i]) if err != nil { @@ -1877,6 +1901,8 @@ func (m *Caches) Size() (n int) { n += 1 + l + sovStats(uint64(l)) l = m.LabelResult.Size() n += 1 + l + sovStats(uint64(l)) + l = m.InstantMetricResult.Size() + n += 1 + l + sovStats(uint64(l)) return n } @@ -2085,6 +2111,7 @@ func (this *Caches) String() string { `VolumeResult:` + strings.Replace(strings.Replace(this.VolumeResult.String(), "Cache", "Cache", 1), `&`, ``, 1) + `,`, `SeriesResult:` + strings.Replace(strings.Replace(this.SeriesResult.String(), "Cache", "Cache", 1), `&`, ``, 1) + `,`, `LabelResult:` + strings.Replace(strings.Replace(this.LabelResult.String(), "Cache", "Cache", 1), `&`, ``, 1) + `,`, + `InstantMetricResult:` + strings.Replace(strings.Replace(this.InstantMetricResult.String(), "Cache", "Cache", 1), `&`, ``, 1) + `,`, `}`, }, "") return s @@ -2637,6 +2664,39 @@ func (m *Caches) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field InstantMetricResult", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStats + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStats + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.InstantMetricResult.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipStats(dAtA[iNdEx:]) diff --git a/pkg/logqlmodel/stats/stats.proto b/pkg/logqlmodel/stats/stats.proto index 8db5b474a7906..d36b8e557d984 100644 --- a/pkg/logqlmodel/stats/stats.proto +++ b/pkg/logqlmodel/stats/stats.proto @@ -57,6 +57,10 @@ message Caches { (gogoproto.nullable) = false, (gogoproto.jsontag) = "labelResult" ]; + Cache instantMetricResult = 8 [ + (gogoproto.nullable) = false, + (gogoproto.jsontag) = "instantMetricResult" + ]; } // Summary is the summary of a query statistics. diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index 9817c04afdc5e..1914c8ab3edfc 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -646,6 +646,13 @@ func applyEmbeddedCacheConfig(r *ConfigWrapper) { r.QueryRange.LabelsCacheConfig.CacheConfig = r.QueryRange.ResultsCacheConfig.CacheConfig r.QueryRange.LabelsCacheConfig.CacheConfig.Prefix = prefix } + + instantMetricCacheConfig := r.QueryRange.InstantMetricCacheConfig.CacheConfig + if !cache.IsCacheConfigured(instantMetricCacheConfig) { + prefix := instantMetricCacheConfig.Prefix + r.QueryRange.InstantMetricCacheConfig.CacheConfig = r.QueryRange.ResultsCacheConfig.CacheConfig + r.QueryRange.InstantMetricCacheConfig.CacheConfig.Prefix = prefix + } } func applyIngesterFinalSleep(cfg *ConfigWrapper) { diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 866079b71f60f..3b1237dad4d1d 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -1055,6 +1055,49 @@ query_range: }) }) + t.Run("for the instant-metric results cache config", func(t *testing.T) { + t.Run("no embedded cache enabled by default if Redis is set", func(t *testing.T) { + configFileString := `--- +query_range: + instant_metric_results_cache: + cache: + redis: + endpoint: endpoint.redis.org` + + config, _, _ := configWrapperFromYAML(t, configFileString, nil) + assert.EqualValues(t, "endpoint.redis.org", config.QueryRange.InstantMetricCacheConfig.CacheConfig.Redis.Endpoint) + assert.EqualValues(t, "frontend.instant-metric-results-cache.", config.QueryRange.InstantMetricCacheConfig.CacheConfig.Prefix) + assert.False(t, config.QueryRange.InstantMetricCacheConfig.CacheConfig.EmbeddedCache.Enabled) + }) + + t.Run("no embedded cache enabled by default if Memcache is set", func(t *testing.T) { + configFileString := `--- +query_range: + instant_metric_results_cache: + cache: + memcached_client: + host: memcached.host.org` + + config, _, _ := configWrapperFromYAML(t, configFileString, nil) + assert.EqualValues(t, "memcached.host.org", config.QueryRange.InstantMetricCacheConfig.CacheConfig.MemcacheClient.Host) + assert.EqualValues(t, "frontend.instant-metric-results-cache.", config.QueryRange.InstantMetricCacheConfig.CacheConfig.Prefix) + assert.False(t, config.QueryRange.InstantMetricCacheConfig.CacheConfig.EmbeddedCache.Enabled) + }) + + t.Run("embedded cache is enabled by default if no other cache is set", func(t *testing.T) { + config, _, _ := configWrapperFromYAML(t, minimalConfig, nil) + assert.True(t, config.QueryRange.InstantMetricCacheConfig.CacheConfig.EmbeddedCache.Enabled) + assert.EqualValues(t, "frontend.instant-metric-results-cache.", config.QueryRange.InstantMetricCacheConfig.CacheConfig.Prefix) + }) + + t.Run("gets results cache config if not configured directly", func(t *testing.T) { + config, _, _ := configWrapperFromYAML(t, defaultResulsCacheString, nil) + assert.EqualValues(t, "memcached.host.org", config.QueryRange.InstantMetricCacheConfig.CacheConfig.MemcacheClient.Host) + assert.EqualValues(t, "frontend.instant-metric-results-cache.", config.QueryRange.InstantMetricCacheConfig.CacheConfig.Prefix) + assert.False(t, config.QueryRange.InstantMetricCacheConfig.CacheConfig.EmbeddedCache.Enabled) + }) + }) + t.Run("for the labels results cache config", func(t *testing.T) { t.Run("no embedded cache enabled by default if Redis is set", func(t *testing.T) { configFileString := `--- diff --git a/pkg/querier/queryrange/codec_test.go b/pkg/querier/queryrange/codec_test.go index 976665df95b99..52e3cc8551b7f 100644 --- a/pkg/querier/queryrange/codec_test.go +++ b/pkg/querier/queryrange/codec_test.go @@ -427,10 +427,12 @@ func Test_codec_DecodeResponse(t *testing.T) { func Test_codec_DecodeProtobufResponseParity(t *testing.T) { // test fixtures from pkg/util/marshal_test var queryTests = []struct { + name string actual parser.Value expected string }{ { + "basic", logqlmodel.Streams{ logproto.Stream{ Entries: []logproto.Entry{ @@ -462,6 +464,7 @@ func Test_codec_DecodeProtobufResponseParity(t *testing.T) { }, // vector test { + "vector", promql.Vector{ { T: 1568404331324, @@ -524,6 +527,7 @@ func Test_codec_DecodeProtobufResponseParity(t *testing.T) { }, // matrix test { + "matrix", promql.Matrix{ { Floats: []promql.FPoint{ @@ -607,50 +611,53 @@ func Test_codec_DecodeProtobufResponseParity(t *testing.T) { } codec := RequestProtobufCodec{} for i, queryTest := range queryTests { - params := url.Values{ - "query": []string{`{app="foo"}`}, - } - u := &url.URL{ - Path: "/loki/api/v1/query_range", - RawQuery: params.Encode(), - } - httpReq := &http.Request{ - Method: "GET", - RequestURI: u.String(), - URL: u, - } - req, err := codec.DecodeRequest(context.TODO(), httpReq, nil) - require.NoError(t, err) + i := i + t.Run(queryTest.name, func(t *testing.T) { + params := url.Values{ + "query": []string{`{app="foo"}`}, + } + u := &url.URL{ + Path: "/loki/api/v1/query_range", + RawQuery: params.Encode(), + } + httpReq := &http.Request{ + Method: "GET", + RequestURI: u.String(), + URL: u, + } + req, err := codec.DecodeRequest(context.TODO(), httpReq, nil) + require.NoError(t, err) - // parser.Value -> queryrange.QueryResponse - var b bytes.Buffer - result := logqlmodel.Result{ - Data: queryTest.actual, - Statistics: statsResult, - } - err = WriteQueryResponseProtobuf(&logql.LiteralParams{}, result, &b) - require.NoError(t, err) + // parser.Value -> queryrange.QueryResponse + var b bytes.Buffer + result := logqlmodel.Result{ + Data: queryTest.actual, + Statistics: statsResult, + } + err = WriteQueryResponseProtobuf(&logql.LiteralParams{}, result, &b) + require.NoError(t, err) - // queryrange.QueryResponse -> queryrangebase.Response - querierResp := &http.Response{ - StatusCode: 200, - Body: io.NopCloser(&b), - Header: http.Header{ - "Content-Type": []string{ProtobufType}, - }, - } - resp, err := codec.DecodeResponse(context.TODO(), querierResp, req) - require.NoError(t, err) + // queryrange.QueryResponse -> queryrangebase.Response + querierResp := &http.Response{ + StatusCode: 200, + Body: io.NopCloser(&b), + Header: http.Header{ + "Content-Type": []string{ProtobufType}, + }, + } + resp, err := codec.DecodeResponse(context.TODO(), querierResp, req) + require.NoError(t, err) - // queryrange.Response -> JSON - ctx := user.InjectOrgID(context.Background(), "1") - httpResp, err := codec.EncodeResponse(ctx, httpReq, resp) - require.NoError(t, err) + // queryrange.Response -> JSON + ctx := user.InjectOrgID(context.Background(), "1") + httpResp, err := codec.EncodeResponse(ctx, httpReq, resp) + require.NoError(t, err) - body, _ := io.ReadAll(httpResp.Body) - require.JSONEqf(t, queryTest.expected, string(body), "Protobuf Decode Query Test %d failed", i) + body, err := io.ReadAll(httpResp.Body) + require.NoError(t, err) + require.JSONEqf(t, queryTest.expected, string(body), "Protobuf Decode Query Test %d failed", i) + }) } - } func Test_codec_EncodeRequest(t *testing.T) { @@ -1645,6 +1652,16 @@ var ( "downloadTime": 0, "queryLengthServed": 0 }, + "instantMetricResult": { + "entriesFound": 0, + "entriesRequested": 0, + "entriesStored": 0, + "bytesReceived": 0, + "bytesSent": 0, + "requests": 0, + "downloadTime": 0, + "queryLengthServed": 0 + }, "result": { "entriesFound": 0, "entriesRequested": 0, @@ -2027,13 +2044,14 @@ var ( }, Caches: stats.Caches{ - Chunk: stats.Cache{}, - Index: stats.Cache{}, - StatsResult: stats.Cache{}, - VolumeResult: stats.Cache{}, - SeriesResult: stats.Cache{}, - LabelResult: stats.Cache{}, - Result: stats.Cache{}, + Chunk: stats.Cache{}, + Index: stats.Cache{}, + StatsResult: stats.Cache{}, + VolumeResult: stats.Cache{}, + SeriesResult: stats.Cache{}, + LabelResult: stats.Cache{}, + Result: stats.Cache{}, + InstantMetricResult: stats.Cache{}, }, } ) diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index 31f8997ed767e..4db8034291f64 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "time" "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" @@ -14,6 +15,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/querier/plan" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" @@ -27,6 +29,8 @@ const ( type DownstreamHandler struct { limits Limits next queryrangebase.Handler + + splitAlign bool } func ParamsToLokiRequest(params logql.Params) queryrangebase.Request { @@ -86,6 +90,7 @@ func (h DownstreamHandler) Downstreamer(ctx context.Context) logql.Downstreamer parallelism: p, locks: locks, handler: h.next, + splitAlign: h.splitAlign, } } @@ -94,16 +99,50 @@ type instance struct { parallelism int locks chan struct{} handler queryrangebase.Handler + + splitAlign bool +} + +// withoutOffset returns the given query string with offsets removed and timestamp adjusted accordingly. If no offset is present in original query, it will be returned as is. +func withoutOffset(query logql.DownstreamQuery) (string, time.Time, time.Time) { + expr := query.Params.GetExpression() + + var ( + newStart = query.Params.Start() + newEnd = query.Params.End() + ) + expr.Walk(func(e syntax.Expr) { + switch rng := e.(type) { + case *syntax.RangeAggregationExpr: + off := rng.Left.Offset + + if off != 0 { + rng.Left.Offset = 0 // remove offset + + // adjust start and end time + newEnd = newEnd.Add(-off) + newStart = newStart.Add(-off) + + } + } + }) + return expr.String(), newStart, newEnd } func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQuery, acc logql.Accumulator) ([]logqlmodel.Result, error) { return in.For(ctx, queries, acc, func(qry logql.DownstreamQuery) (logqlmodel.Result, error) { - req := ParamsToLokiRequest(qry.Params).WithQuery(qry.Params.GetExpression().String()) + var req queryrangebase.Request + if in.splitAlign { + qs, newStart, newEnd := withoutOffset(qry) + req = ParamsToLokiRequest(qry.Params).WithQuery(qs).WithStartEnd(newStart, newEnd) + } else { + req = ParamsToLokiRequest(qry.Params).WithQuery(qry.Params.GetExpression().String()) + } sp, ctx := opentracing.StartSpanFromContext(ctx, "DownstreamHandler.instance") defer sp.Finish() logger := spanlogger.FromContext(ctx) defer logger.Finish() - level.Debug(logger).Log("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler)) + level.Debug(logger).Log("shards", fmt.Sprintf("%+v", qry.Params.Shards()), "query", req.GetQuery(), "step", req.GetStep(), "handler", reflect.TypeOf(in.handler), "engine", "downstream") res, err := in.handler.Do(ctx, req) if err != nil { diff --git a/pkg/querier/queryrange/downstreamer_test.go b/pkg/querier/queryrange/downstreamer_test.go index a23f2a381b007..cadfceeee20e3 100644 --- a/pkg/querier/queryrange/downstreamer_test.go +++ b/pkg/querier/queryrange/downstreamer_test.go @@ -3,6 +3,7 @@ package queryrange import ( "context" "errors" + "fmt" "strconv" "strings" "sync" @@ -12,6 +13,7 @@ import ( "github.com/grafana/dskit/user" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -325,71 +327,142 @@ func TestInstanceFor(t *testing.T) { } func TestInstanceDownstream(t *testing.T) { - params, err := logql.NewLiteralParams( - `{foo="bar"}`, - time.Now(), - time.Now(), - 0, - 0, - logproto.BACKWARD, - 1000, - nil, - ) - require.NoError(t, err) - expr, err := syntax.ParseExpr(`{foo="bar"}`) - require.NoError(t, err) - - expectedResp := func() *LokiResponse { - return &LokiResponse{ - Data: LokiData{ - Result: []logproto.Stream{{ - Labels: `{foo="bar"}`, - Entries: []logproto.Entry{ - {Timestamp: time.Unix(0, 0), Line: "foo"}, - }, - }}, + t.Run("Downstream simple query", func(t *testing.T) { + ts := time.Unix(1, 0) + + params, err := logql.NewLiteralParams( + `{foo="bar"}`, + ts, + ts, + 0, + 0, + logproto.BACKWARD, + 1000, + nil, + ) + require.NoError(t, err) + expr, err := syntax.ParseExpr(`{foo="bar"}`) + require.NoError(t, err) + + expectedResp := func() *LokiResponse { + return &LokiResponse{ + Data: LokiData{ + Result: []logproto.Stream{{ + Labels: `{foo="bar"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 0), Line: "foo"}, + }, + }}, + }, + Statistics: stats.Result{ + Summary: stats.Summary{QueueTime: 1, ExecTime: 2}, + }, + } + } + + queries := []logql.DownstreamQuery{ + { + Params: logql.ParamsWithShardsOverride{ + Params: logql.ParamsWithExpressionOverride{Params: params, ExpressionOverride: expr}, + ShardsOverride: logql.Shards{{Shard: 0, Of: 2}}.Encode(), + }, }, - Statistics: stats.Result{ - Summary: stats.Summary{QueueTime: 1, ExecTime: 2}, + } + + var got queryrangebase.Request + var want queryrangebase.Request + handler := queryrangebase.HandlerFunc( + func(_ context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { + // for some reason these seemingly can't be checked in their own goroutines, + // so we assign them to scoped variables for later comparison. + got = req + want = ParamsToLokiRequest(queries[0].Params).WithQuery(expr.String()) + + return expectedResp(), nil }, + ) + + expected, err := ResponseToResult(expectedResp()) + require.Nil(t, err) + + results, err := DownstreamHandler{ + limits: fakeLimits{}, + next: handler, + }.Downstreamer(context.Background()).Downstream(context.Background(), queries, logql.NewBufferedAccumulator(len(queries))) + + fmt.Println("want", want.GetEnd(), want.GetStart(), "got", got.GetEnd(), got.GetStart()) + require.Equal(t, want, got) + require.Nil(t, err) + require.Equal(t, 1, len(results)) + require.Equal(t, expected.Data, results[0].Data) + }) + + t.Run("Downstream with offset removed", func(t *testing.T) { + ts := time.Unix(1, 0) + + params, err := logql.NewLiteralParams( + `sum(rate({foo="bar"}[2h] offset 1h))`, + ts, + ts, + 0, + 0, + logproto.BACKWARD, + 1000, + nil, + ) + require.NoError(t, err) + + expectedResp := func() *LokiResponse { + return &LokiResponse{ + Data: LokiData{ + Result: []logproto.Stream{{ + Labels: `{foo="bar"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 0), Line: "foo"}, + }, + }}, + }, + Statistics: stats.Result{ + Summary: stats.Summary{QueueTime: 1, ExecTime: 2}, + }, + } } - } - queries := []logql.DownstreamQuery{ - { - Params: logql.ParamsWithShardsOverride{ - Params: logql.ParamsWithExpressionOverride{Params: params, ExpressionOverride: expr}, - ShardsOverride: logql.Shards{{Shard: 0, Of: 2}}.Encode(), + queries := []logql.DownstreamQuery{ + { + Params: params, }, - }, - } + } - var got queryrangebase.Request - var want queryrangebase.Request - handler := queryrangebase.HandlerFunc( - func(_ context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { - // for some reason these seemingly can't be checked in their own goroutines, - // so we assign them to scoped variables for later comparison. - got = req - want = ParamsToLokiRequest(queries[0].Params).WithQuery(expr.String()) + var got queryrangebase.Request + var want queryrangebase.Request + handler := queryrangebase.HandlerFunc( + func(_ context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { + // for some reason these seemingly can't be checked in their own goroutines, + // so we assign them to scoped variables for later comparison. + got = req + want = ParamsToLokiRequest(params).WithQuery(`sum(rate({foo="bar"}[2h]))`).WithStartEnd(ts.Add(-1*time.Hour), ts.Add(-1*time.Hour)) // without offset and start, end adjusted for instant query - return expectedResp(), nil - }, - ) + return expectedResp(), nil + }, + ) - expected, err := ResponseToResult(expectedResp()) - require.Nil(t, err) + expected, err := ResponseToResult(expectedResp()) + require.NoError(t, err) - results, err := DownstreamHandler{ - limits: fakeLimits{}, - next: handler, - }.Downstreamer(context.Background()).Downstream(context.Background(), queries, logql.NewBufferedAccumulator(len(queries))) + results, err := DownstreamHandler{ + limits: fakeLimits{}, + next: handler, + splitAlign: true, + }.Downstreamer(context.Background()).Downstream(context.Background(), queries, logql.NewBufferedAccumulator(len(queries))) - require.Equal(t, want, got) + assert.Equal(t, want, got) - require.Nil(t, err) - require.Equal(t, 1, len(results)) - require.Equal(t, expected.Data, results[0].Data) + require.Nil(t, err) + require.Equal(t, 1, len(results)) + require.Equal(t, expected.Data, results[0].Data) + + }) } func TestCancelWhileWaitingResponse(t *testing.T) { diff --git a/pkg/querier/queryrange/instant_metric_cache.go b/pkg/querier/queryrange/instant_metric_cache.go new file mode 100644 index 0000000000000..ef1083e6cd229 --- /dev/null +++ b/pkg/querier/queryrange/instant_metric_cache.go @@ -0,0 +1,85 @@ +package queryrange + +import ( + "context" + "flag" + "fmt" + "time" + + "github.com/go-kit/log" + + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" + "github.com/grafana/loki/pkg/storage/chunk/cache" + "github.com/grafana/loki/pkg/storage/chunk/cache/resultscache" +) + +type InstantMetricSplitter struct { + Limits + transformer UserIDTransformer +} + +// GenerateCacheKey generates a cache key based on the userID, Request and interval. +func (i InstantMetricSplitter) GenerateCacheKey(ctx context.Context, userID string, r resultscache.Request) string { + split := i.InstantMetricQuerySplitDuration(userID) + + var currentInterval int64 + if denominator := int64(split / time.Millisecond); denominator > 0 { + currentInterval = r.GetStart().UnixMilli() / denominator + } + + if i.transformer != nil { + userID = i.transformer(ctx, userID) + } + + // include both the currentInterval and the split duration in key to ensure + // a cache key can't be reused when an interval changes + return fmt.Sprintf("instant-metric:%s:%s:%d:%d", userID, r.GetQuery(), currentInterval, split) +} + +type InstantMetricCacheConfig struct { + queryrangebase.ResultsCacheConfig `yaml:",inline"` +} + +// RegisterFlags registers flags. +func (cfg *InstantMetricCacheConfig) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix(f, "frontend.instant-metric-results-cache.") +} + +func (cfg *InstantMetricCacheConfig) Validate() error { + return cfg.ResultsCacheConfig.Validate() +} + +type instantMetricExtractor struct{} + +func NewInstantMetricCacheMiddleware( + log log.Logger, + limits Limits, + merger queryrangebase.Merger, + c cache.Cache, + cacheGenNumberLoader queryrangebase.CacheGenNumberLoader, + shouldCache queryrangebase.ShouldCacheFn, + parallelismForReq queryrangebase.ParallelismForReqFn, + retentionEnabled bool, + transformer UserIDTransformer, + metrics *queryrangebase.ResultsCacheMetrics, +) (queryrangebase.Middleware, error) { + return queryrangebase.NewResultsCacheMiddleware( + log, + c, + InstantMetricSplitter{limits, transformer}, + limits, + merger, + PrometheusExtractor{}, + cacheGenNumberLoader, + func(ctx context.Context, r queryrangebase.Request) bool { + if shouldCache != nil && !shouldCache(ctx, r) { + return false + } + return true + }, + parallelismForReq, + retentionEnabled, + false, + metrics, + ) +} diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index 2d14531909695..ab7818460738f 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -68,6 +68,15 @@ func (l limits) QuerySplitDuration(user string) time.Duration { return *l.splitDuration } +func (l limits) InstantMetricQuerySplitDuration(user string) time.Duration { + // NOTE: It returns `splitDuration` for both instant and range queries. + // no need to have separate limits for now. + if l.splitDuration == nil { + return l.Limits.QuerySplitDuration(user) + } + return *l.splitDuration +} + func (l limits) TSDBMaxQueryParallelism(ctx context.Context, user string) int { if l.maxQueryParallelism == nil { return l.Limits.TSDBMaxQueryParallelism(ctx, user) diff --git a/pkg/querier/queryrange/limits/definitions.go b/pkg/querier/queryrange/limits/definitions.go index 3e78b34420760..9e1232b750797 100644 --- a/pkg/querier/queryrange/limits/definitions.go +++ b/pkg/querier/queryrange/limits/definitions.go @@ -14,6 +14,7 @@ type Limits interface { queryrangebase.Limits logql.Limits QuerySplitDuration(string) time.Duration + InstantMetricQuerySplitDuration(string) time.Duration MetadataQuerySplitDuration(string) time.Duration RecentMetadataQuerySplitDuration(string) time.Duration RecentMetadataQueryWindow(string) time.Duration diff --git a/pkg/querier/queryrange/prometheus_test.go b/pkg/querier/queryrange/prometheus_test.go index a8e09b378bb2c..4ec798b534a73 100644 --- a/pkg/querier/queryrange/prometheus_test.go +++ b/pkg/querier/queryrange/prometheus_test.go @@ -118,6 +118,16 @@ var emptyStats = `"stats": { "downloadTime": 0, "queryLengthServed": 0 }, + "instantMetricResult": { + "entriesFound": 0, + "entriesRequested": 0, + "entriesStored": 0, + "bytesReceived": 0, + "bytesSent": 0, + "requests": 0, + "downloadTime": 0, + "queryLengthServed": 0 + }, "result": { "entriesFound": 0, "entriesRequested": 0, diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 10246f4d8277e..5532eab989c1e 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -44,16 +44,19 @@ const ( // Config is the configuration for the queryrange tripperware type Config struct { - base.Config `yaml:",inline"` - Transformer UserIDTransformer `yaml:"-"` - CacheIndexStatsResults bool `yaml:"cache_index_stats_results"` - StatsCacheConfig IndexStatsCacheConfig `yaml:"index_stats_results_cache" doc:"description=If a cache config is not specified and cache_index_stats_results is true, the config for the results cache is used."` - CacheVolumeResults bool `yaml:"cache_volume_results"` - VolumeCacheConfig VolumeCacheConfig `yaml:"volume_results_cache" doc:"description=If a cache config is not specified and cache_volume_results is true, the config for the results cache is used."` - CacheSeriesResults bool `yaml:"cache_series_results"` - SeriesCacheConfig SeriesCacheConfig `yaml:"series_results_cache" doc:"description=If series_results_cache is not configured and cache_series_results is true, the config for the results cache is used."` - CacheLabelResults bool `yaml:"cache_label_results"` - LabelsCacheConfig LabelsCacheConfig `yaml:"label_results_cache" doc:"description=If label_results_cache is not configured and cache_label_results is true, the config for the results cache is used."` + base.Config `yaml:",inline"` + Transformer UserIDTransformer `yaml:"-"` + CacheIndexStatsResults bool `yaml:"cache_index_stats_results"` + StatsCacheConfig IndexStatsCacheConfig `yaml:"index_stats_results_cache" doc:"description=If a cache config is not specified and cache_index_stats_results is true, the config for the results cache is used."` + CacheVolumeResults bool `yaml:"cache_volume_results"` + VolumeCacheConfig VolumeCacheConfig `yaml:"volume_results_cache" doc:"description=If a cache config is not specified and cache_volume_results is true, the config for the results cache is used."` + CacheInstantMetricResults bool `yaml:"cache_instant_metric_results"` + InstantMetricCacheConfig InstantMetricCacheConfig `yaml:"instant_metric_results_cache" doc:"description=If a cache config is not specified and cache_instant_metric_results is true, the config for the results cache is used."` + InstantMetricQuerySplitAlign bool `yaml:"instant_metric_query_split_align" doc:"description=Whether to align the splits of instant metric query with splitByInterval and query's exec time. Useful when instant_metric_cache is enabled"` + CacheSeriesResults bool `yaml:"cache_series_results"` + SeriesCacheConfig SeriesCacheConfig `yaml:"series_results_cache" doc:"description=If series_results_cache is not configured and cache_series_results is true, the config for the results cache is used."` + CacheLabelResults bool `yaml:"cache_label_results"` + LabelsCacheConfig LabelsCacheConfig `yaml:"label_results_cache" doc:"description=If label_results_cache is not configured and cache_label_results is true, the config for the results cache is used."` } // RegisterFlags adds the flags required to configure this flag set. @@ -63,6 +66,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.StatsCacheConfig.RegisterFlags(f) f.BoolVar(&cfg.CacheVolumeResults, "querier.cache-volume-results", false, "Cache volume query results.") cfg.VolumeCacheConfig.RegisterFlags(f) + f.BoolVar(&cfg.CacheInstantMetricResults, "querier.cache-instant-metric-results", false, "Cache instant metric query results.") + cfg.InstantMetricCacheConfig.RegisterFlags(f) + f.BoolVar(&cfg.InstantMetricQuerySplitAlign, "querier.instant-metric-query-split-align", false, "Align the instant metric splits with splityByInterval and query's exec time.") f.BoolVar(&cfg.CacheSeriesResults, "querier.cache-series-results", false, "Cache series query results.") cfg.SeriesCacheConfig.RegisterFlags(f) f.BoolVar(&cfg.CacheLabelResults, "querier.cache-label-results", false, "Cache label query results.") @@ -132,12 +138,13 @@ func NewMiddleware( metrics := NewMetrics(registerer, metricsNamespace) var ( - resultsCache cache.Cache - statsCache cache.Cache - volumeCache cache.Cache - seriesCache cache.Cache - labelsCache cache.Cache - err error + resultsCache cache.Cache + statsCache cache.Cache + volumeCache cache.Cache + instantMetricCache cache.Cache + seriesCache cache.Cache + labelsCache cache.Cache + err error ) if cfg.CacheResults { @@ -161,6 +168,13 @@ func NewMiddleware( } } + if cfg.CacheInstantMetricResults { + instantMetricCache, err = newResultsCacheFromConfig(cfg.InstantMetricCacheConfig.ResultsCacheConfig, registerer, log, stats.InstantMetricResultsCache) + if err != nil { + return nil, nil, err + } + } + if cfg.CacheSeriesResults { seriesCache, err = newResultsCacheFromConfig(cfg.SeriesCacheConfig.ResultsCacheConfig, registerer, log, stats.SeriesResultCache) if err != nil { @@ -211,7 +225,7 @@ func NewMiddleware( return nil, nil, err } - instantMetricTripperware, err := NewInstantMetricTripperware(cfg, engineOpts, log, limits, schema, metrics, indexStatsTripperware, metricsNamespace) + instantMetricTripperware, err := NewInstantMetricTripperware(cfg, engineOpts, log, limits, schema, metrics, codec, instantMetricCache, cacheGenNumLoader, retentionEnabled, indexStatsTripperware, metricsNamespace) if err != nil { return nil, nil, err } @@ -761,7 +775,51 @@ func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logge } // NewInstantMetricTripperware creates a new frontend tripperware responsible for handling metric queries -func NewInstantMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) { +func NewInstantMetricTripperware( + cfg Config, + engineOpts logql.EngineOpts, + log log.Logger, + limits Limits, + schema config.SchemaConfig, + metrics *Metrics, + merger base.Merger, + c cache.Cache, + cacheGenNumLoader base.CacheGenNumberLoader, + retentionEnabled bool, + indexStatsTripperware base.Middleware, + metricsNamespace string, +) (base.Middleware, error) { + var cacheMiddleware base.Middleware + if cfg.CacheInstantMetricResults { + var err error + cacheMiddleware, err = NewInstantMetricCacheMiddleware( + log, + limits, + merger, + c, + cacheGenNumLoader, + func(_ context.Context, r base.Request) bool { + return !r.GetCachingOptions().Disabled + }, + func(ctx context.Context, tenantIDs []string, r base.Request) int { + return MinWeightedParallelism( + ctx, + tenantIDs, + schema.Configs, + limits, + model.Time(r.GetStart().UnixMilli()), + model.Time(r.GetEnd().UnixMilli()), + ) + }, + retentionEnabled, + cfg.Transformer, + metrics.ResultsCacheMetrics, + ) + if err != nil { + return nil, err + } + } + return base.MiddlewareFunc(func(next base.Handler) base.Handler { statsHandler := indexStatsTripperware.Wrap(next) @@ -769,11 +827,19 @@ func NewInstantMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log lo StatsCollectorMiddleware(), NewLimitsMiddleware(limits), NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler), + NewSplitByRangeMiddleware(log, engineOpts, limits, cfg.InstantMetricQuerySplitAlign, metrics.MiddlewareMapperMetrics.rangeMapper), + } + + if cfg.CacheInstantMetricResults { + queryRangeMiddleware = append( + queryRangeMiddleware, + base.InstrumentMiddleware("instant_metric_results_cache", metrics.InstrumentMiddlewareMetrics), + cacheMiddleware, + ) } if cfg.ShardedQueries { queryRangeMiddleware = append(queryRangeMiddleware, - NewSplitByRangeMiddleware(log, engineOpts, limits, metrics.MiddlewareMapperMetrics.rangeMapper), NewQueryShardMiddleware( log, schema.Configs, diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 7d74b0dd615c8..206822a50f6e8 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -1247,6 +1247,7 @@ type fakeLimits struct { metadataSplitDuration map[string]time.Duration recentMetadataSplitDuration map[string]time.Duration recentMetadataQueryWindow map[string]time.Duration + instantMetricSplitDuration map[string]time.Duration ingesterSplitDuration map[string]time.Duration minShardingLookback time.Duration queryTimeout time.Duration @@ -1266,6 +1267,13 @@ func (f fakeLimits) QuerySplitDuration(key string) time.Duration { return f.splitDuration[key] } +func (f fakeLimits) InstantMetricQuerySplitDuration(key string) time.Duration { + if f.instantMetricSplitDuration == nil { + return 0 + } + return f.instantMetricSplitDuration[key] +} + func (f fakeLimits) MetadataQuerySplitDuration(key string) time.Duration { if f.metadataSplitDuration == nil { return 0 diff --git a/pkg/querier/queryrange/split_by_range.go b/pkg/querier/queryrange/split_by_range.go index 6845846d4deaa..16076cd948596 100644 --- a/pkg/querier/queryrange/split_by_range.go +++ b/pkg/querier/queryrange/split_by_range.go @@ -26,20 +26,25 @@ type splitByRange struct { limits Limits ng *logql.DownstreamEngine metrics *logql.MapperMetrics + + // Whether to align rangeInterval align to splitByInterval in the subqueries. + splitAlign bool } // NewSplitByRangeMiddleware creates a new Middleware that splits log requests by the range interval. -func NewSplitByRangeMiddleware(logger log.Logger, engineOpts logql.EngineOpts, limits Limits, metrics *logql.MapperMetrics) queryrangebase.Middleware { +func NewSplitByRangeMiddleware(logger log.Logger, engineOpts logql.EngineOpts, limits Limits, splitAlign bool, metrics *logql.MapperMetrics) queryrangebase.Middleware { return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler { return &splitByRange{ logger: log.With(logger, "middleware", "InstantQuery.splitByRangeVector"), next: next, limits: limits, ng: logql.NewDownstreamEngine(engineOpts, DownstreamHandler{ - limits: limits, - next: next, + limits: limits, + next: next, + splitAlign: splitAlign, }, limits, logger), - metrics: metrics, + metrics: metrics, + splitAlign: splitAlign, } }) } @@ -57,14 +62,26 @@ func (s *splitByRange) Do(ctx context.Context, request queryrangebase.Request) ( return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } - interval := validation.SmallestPositiveNonZeroDurationPerTenant(tenants, s.limits.QuerySplitDuration) + interval := validation.SmallestPositiveNonZeroDurationPerTenant(tenants, s.limits.InstantMetricQuerySplitDuration) // if no interval configured, continue to the next middleware if interval == 0 { return s.next.Do(ctx, request) } mapperStats := logql.NewMapperStats() - mapper, err := logql.NewRangeMapper(interval, s.metrics, mapperStats) + + ir, ok := request.(*LokiInstantRequest) + if !ok { + return nil, fmt.Errorf("expected *LokiInstantRequest, got %T", request) + } + + var mapper logql.RangeMapper + + if s.splitAlign { + mapper, err = logql.NewRangeMapperWithSplitAlign(interval, ir.TimeTs, s.metrics, mapperStats) + } else { + mapper, err = logql.NewRangeMapper(interval, s.metrics, mapperStats) + } if err != nil { return nil, err } @@ -85,10 +102,6 @@ func (s *splitByRange) Do(ctx context.Context, request queryrangebase.Request) ( queryStatsCtx := stats.FromContext(ctx) queryStatsCtx.AddSplitQueries(int64(mapperStats.GetSplitQueries())) - if _, ok := request.(*LokiInstantRequest); !ok { - return nil, fmt.Errorf("expected *LokiInstantRequest, got %T", request) - } - query := s.ng.Query(ctx, logql.ParamsWithExpressionOverride{Params: params, ExpressionOverride: parsed}) res, err := query.Exec(ctx) diff --git a/pkg/querier/queryrange/split_by_range_test.go b/pkg/querier/queryrange/split_by_range_test.go index b1687611abc1d..af66c10a2f08a 100644 --- a/pkg/querier/queryrange/split_by_range_test.go +++ b/pkg/querier/queryrange/split_by_range_test.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/user" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/loghttp" @@ -17,14 +18,291 @@ import ( "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" ) +func Test_RangeVectorSplitAlign(t *testing.T) { + var ( + twelve34 = time.Date(1970, 1, 1, 12, 34, 0, 0, time.UTC) // 1970 12:34:00 UTC + twelve = time.Date(1970, 1, 1, 12, 00, 0, 0, time.UTC) // 1970 12:00:00 UTC + eleven = twelve.Add(-1 * time.Hour) // 1970 11:00:00 UTC + ten = eleven.Add(-1 * time.Hour) // 1970 10:00:00 UTC + ) + + for _, tc := range []struct { + name string + in queryrangebase.Request + subQueries []queryrangebase.RequestResponse + expected queryrangebase.Response + splitByInterval time.Duration + }{ + { + name: "sum_splitBy_aligned_with_query_time", + splitByInterval: 1 * time.Minute, + in: &LokiInstantRequest{ + Query: `sum(bytes_over_time({app="foo"}[3m]))`, + TimeTs: time.Unix(180, 0), + Path: "/loki/api/v1/query", + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`sum(bytes_over_time({app="foo"}[3m]))`), + }, + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponseWithQueryTime(`sum(bytes_over_time({app="foo"}[1m]))`, 1, time.Unix(60, 0)), + subQueryRequestResponseWithQueryTime(`sum(bytes_over_time({app="foo"}[1m]))`, 2, time.Unix(120, 0)), + subQueryRequestResponseWithQueryTime(`sum(bytes_over_time({app="foo"}[1m]))`, 3, time.Unix(180, 0)), + }, + expected: expectedMergedResponseWithTime(1+2+3, time.Unix(180, 0)), // original `TimeTs` of the query. + }, + { + name: "sum_splitBy_not_aligned_query_time", + splitByInterval: 1 * time.Hour, + in: &LokiInstantRequest{ + Query: `sum(bytes_over_time({app="foo"}[3h]))`, + TimeTs: twelve34, + Path: "/loki/api/v1/query", + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`sum(bytes_over_time({app="foo"}[3h]))`), + }, + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponseWithQueryTime(`sum(bytes_over_time({app="foo"}[34m]))`, 1, twelve34), + subQueryRequestResponseWithQueryTime(`sum(bytes_over_time({app="foo"}[1h]))`, 2, twelve), + subQueryRequestResponseWithQueryTime(`sum(bytes_over_time({app="foo"}[1h]))`, 3, eleven), + subQueryRequestResponseWithQueryTime(`sum(bytes_over_time({app="foo"}[26m]))`, 4, ten), + }, + expected: expectedMergedResponseWithTime(1+2+3+4, twelve34), // original `TimeTs` of the query. + }, + { + name: "sum_aggregation_splitBy_aligned_with_query_time", + splitByInterval: 1 * time.Minute, + in: &LokiInstantRequest{ + Query: `sum by (bar) (bytes_over_time({app="foo"}[3m]))`, + TimeTs: time.Unix(180, 0), + Path: "/loki/api/v1/query", + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`sum by (bar) (bytes_over_time({app="foo"}[3m]))`), + }, + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponseWithQueryTime(`sum by (bar)(bytes_over_time({app="foo"}[1m]))`, 10, time.Unix(60, 0)), + subQueryRequestResponseWithQueryTime(`sum by (bar)(bytes_over_time({app="foo"}[1m]))`, 20, time.Unix(120, 0)), + subQueryRequestResponseWithQueryTime(`sum by (bar)(bytes_over_time({app="foo"}[1m]))`, 30, time.Unix(180, 0)), + }, + expected: expectedMergedResponseWithTime(10+20+30, time.Unix(180, 0)), + }, + { + name: "sum_aggregation_splitBy_not_aligned_with_query_time", + splitByInterval: 1 * time.Hour, + in: &LokiInstantRequest{ + Query: `sum by (bar) (bytes_over_time({app="foo"}[3h]))`, + TimeTs: twelve34, + Path: "/loki/api/v1/query", + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`sum by (bar) (bytes_over_time({app="foo"}[3h]))`), + }, + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponseWithQueryTime(`sum by (bar)(bytes_over_time({app="foo"}[34m]))`, 10, twelve34), // 12:34:00 + subQueryRequestResponseWithQueryTime(`sum by (bar)(bytes_over_time({app="foo"}[1h]))`, 20, twelve), // 12:00:00 aligned + subQueryRequestResponseWithQueryTime(`sum by (bar)(bytes_over_time({app="foo"}[1h]))`, 30, eleven), // 11:00:00 aligned + subQueryRequestResponseWithQueryTime(`sum by (bar)(bytes_over_time({app="foo"}[26m]))`, 40, ten), // 10:00:00 + }, + expected: expectedMergedResponseWithTime(10+20+30+40, twelve34), + }, + { + name: "count_over_time_aligned_with_query_time", + splitByInterval: 1 * time.Minute, + in: &LokiInstantRequest{ + Query: `sum(count_over_time({app="foo"}[3m]))`, + TimeTs: time.Unix(180, 0), + Path: "/loki/api/v1/query", + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`sum(count_over_time({app="foo"}[3m]))`), + }, + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponseWithQueryTime(`sum(count_over_time({app="foo"}[1m]))`, 1, time.Unix(60, 0)), + subQueryRequestResponseWithQueryTime(`sum(count_over_time({app="foo"}[1m]))`, 1, time.Unix(120, 0)), + subQueryRequestResponseWithQueryTime(`sum(count_over_time({app="foo"}[1m]))`, 1, time.Unix(180, 0)), + }, + expected: expectedMergedResponseWithTime(1+1+1, time.Unix(180, 0)), + }, + { + name: "count_over_time_not_aligned_with_query_time", + splitByInterval: 1 * time.Hour, + in: &LokiInstantRequest{ + Query: `sum(count_over_time({app="foo"}[3h]))`, + TimeTs: twelve34, + Path: "/loki/api/v1/query", + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`sum(count_over_time({app="foo"}[3h]))`), + }, + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponseWithQueryTime(`sum(count_over_time({app="foo"}[34m]))`, 1, twelve34), + subQueryRequestResponseWithQueryTime(`sum(count_over_time({app="foo"}[1h]))`, 1, twelve), + subQueryRequestResponseWithQueryTime(`sum(count_over_time({app="foo"}[1h]))`, 1, eleven), + subQueryRequestResponseWithQueryTime(`sum(count_over_time({app="foo"}[26m]))`, 1, ten), + }, + expected: expectedMergedResponseWithTime(1+1+1+1, twelve34), + }, + { + name: "sum_agg_count_over_time_align_with_query_time", + splitByInterval: 1 * time.Minute, + in: &LokiInstantRequest{ + Query: `sum by (bar) (count_over_time({app="foo"}[3m]))`, + TimeTs: time.Unix(180, 0), + Path: "/loki/api/v1/query", + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`sum by (bar) (count_over_time({app="foo"}[3m]))`), + }, + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponseWithQueryTime(`sum by (bar)(count_over_time({app="foo"}[1m]))`, 0, time.Unix(60, 0)), + subQueryRequestResponseWithQueryTime(`sum by (bar)(count_over_time({app="foo"}[1m]))`, 0, time.Unix(120, 0)), + subQueryRequestResponseWithQueryTime(`sum by (bar)(count_over_time({app="foo"}[1m]))`, 0, time.Unix(180, 0)), + }, + expected: expectedMergedResponseWithTime(0+0+0, time.Unix(180, 0)), + }, + { + name: "sum_agg_count_over_time_not_align_with_query_time", + splitByInterval: 1 * time.Hour, + in: &LokiInstantRequest{ + Query: `sum by (bar) (count_over_time({app="foo"}[3h]))`, + TimeTs: twelve34, + Path: "/loki/api/v1/query", + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`sum by (bar) (count_over_time({app="foo"}[3h]))`), + }, + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponseWithQueryTime(`sum by (bar)(count_over_time({app="foo"}[34m]))`, 0, twelve34), + subQueryRequestResponseWithQueryTime(`sum by (bar)(count_over_time({app="foo"}[1h]))`, 0, twelve), + subQueryRequestResponseWithQueryTime(`sum by (bar)(count_over_time({app="foo"}[1h]))`, 0, eleven), + subQueryRequestResponseWithQueryTime(`sum by (bar)(count_over_time({app="foo"}[26m]))`, 0, ten), + }, + expected: expectedMergedResponseWithTime(0+0+0+0, twelve34), + }, + { + name: "sum_over_time_aligned_with_query_time", + splitByInterval: 1 * time.Minute, + in: &LokiInstantRequest{ + Query: `sum(sum_over_time({app="foo"} | unwrap bar [3m]))`, + TimeTs: time.Unix(180, 0), + Path: "/loki/api/v1/query", + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`sum(sum_over_time({app="foo"} | unwrap bar [3m]))`), + }, + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponseWithQueryTime(`sum(sum_over_time({app="foo"} | unwrap bar[1m]))`, 1, time.Unix(60, 0)), + subQueryRequestResponseWithQueryTime(`sum(sum_over_time({app="foo"} | unwrap bar[1m]))`, 2, time.Unix(120, 0)), + subQueryRequestResponseWithQueryTime(`sum(sum_over_time({app="foo"} | unwrap bar[1m]))`, 3, time.Unix(180, 0)), + }, + expected: expectedMergedResponseWithTime(1+2+3, time.Unix(180, 0)), + }, + { + name: "sum_over_time_not_aligned_with_query_time", + splitByInterval: 1 * time.Hour, + in: &LokiInstantRequest{ + Query: `sum(sum_over_time({app="foo"} | unwrap bar [3h]))`, + TimeTs: twelve34, + Path: "/loki/api/v1/query", + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`sum(sum_over_time({app="foo"} | unwrap bar [3h]))`), + }, + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponseWithQueryTime(`sum(sum_over_time({app="foo"} | unwrap bar[34m]))`, 1, twelve34), + subQueryRequestResponseWithQueryTime(`sum(sum_over_time({app="foo"} | unwrap bar[1h]))`, 2, twelve), + subQueryRequestResponseWithQueryTime(`sum(sum_over_time({app="foo"} | unwrap bar[1h]))`, 3, eleven), + subQueryRequestResponseWithQueryTime(`sum(sum_over_time({app="foo"} | unwrap bar[26m]))`, 4, ten), + }, + expected: expectedMergedResponseWithTime(1+2+3+4, twelve34), + }, + { + name: "sum_agg_sum_over_time_aligned_with_query_time", + splitByInterval: 1 * time.Minute, + in: &LokiInstantRequest{ + Query: `sum by (bar) (sum_over_time({app="foo"} | unwrap bar [3m]))`, + TimeTs: time.Unix(180, 0), + Path: "/loki/api/v1/query", + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`sum by (bar) (sum_over_time({app="foo"} | unwrap bar [3m]))`), + }, + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponseWithQueryTime(`sum by (bar)(sum_over_time({app="foo"} | unwrap bar[1m]))`, 1, time.Unix(60, 0)), + subQueryRequestResponseWithQueryTime(`sum by (bar)(sum_over_time({app="foo"} | unwrap bar[1m]))`, 2, time.Unix(120, 0)), + subQueryRequestResponseWithQueryTime(`sum by (bar)(sum_over_time({app="foo"} | unwrap bar[1m]))`, 3, time.Unix(180, 0)), + }, + expected: expectedMergedResponseWithTime(1+2+3, time.Unix(180, 0)), + }, + { + name: "sum_agg_sum_over_time_not_aligned_with_query_time", + splitByInterval: 1 * time.Hour, + in: &LokiInstantRequest{ + Query: `sum by (bar) (sum_over_time({app="foo"} | unwrap bar [3h]))`, + TimeTs: twelve34, + Path: "/loki/api/v1/query", + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`sum by (bar) (sum_over_time({app="foo"} | unwrap bar [3h]))`), + }, + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponseWithQueryTime(`sum by (bar)(sum_over_time({app="foo"} | unwrap bar[34m]))`, 1, twelve34), + subQueryRequestResponseWithQueryTime(`sum by (bar)(sum_over_time({app="foo"} | unwrap bar[1h]))`, 2, twelve), + subQueryRequestResponseWithQueryTime(`sum by (bar)(sum_over_time({app="foo"} | unwrap bar[1h]))`, 3, eleven), + subQueryRequestResponseWithQueryTime(`sum by (bar)(sum_over_time({app="foo"} | unwrap bar[26m]))`, 4, ten), + }, + expected: expectedMergedResponseWithTime(1+2+3+4, twelve34), + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + srm := NewSplitByRangeMiddleware(log.NewNopLogger(), testEngineOpts, fakeLimits{ + maxSeries: 10000, + queryTimeout: time.Second, + instantMetricSplitDuration: map[string]time.Duration{ + "tenant": tc.splitByInterval, + }, + }, true, nilShardingMetrics) // enable splitAlign + + ctx := user.InjectOrgID(context.TODO(), "tenant") + + byTimeTs := make(map[int64]queryrangebase.RequestResponse) + for _, v := range tc.subQueries { + key := v.Request.(*LokiInstantRequest).TimeTs.UnixNano() + byTimeTs[key] = v + } + + resp, err := srm.Wrap(queryrangebase.HandlerFunc( + func(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { + // req should match with one of the subqueries. + ts := req.(*LokiInstantRequest).TimeTs + subq, ok := byTimeTs[ts.UnixNano()] + if !ok { // every req **should** match with one of the subqueries + return nil, fmt.Errorf("subquery request '%s-%s' not found", req.GetQuery(), ts) + } + + // Assert subquery request + assert.Equal(t, subq.Request.GetQuery(), req.GetQuery()) + assert.Equal(t, subq.Request, req) + return subq.Response, nil + + })).Do(ctx, tc.in) + require.NoError(t, err) + assert.Equal(t, tc.expected, resp.(*LokiPromResponse).Response) + }) + } +} + func Test_RangeVectorSplit(t *testing.T) { srm := NewSplitByRangeMiddleware(log.NewNopLogger(), testEngineOpts, fakeLimits{ maxSeries: 10000, queryTimeout: time.Second, - splitDuration: map[string]time.Duration{ + instantMetricSplitDuration: map[string]time.Duration{ "tenant": time.Minute, }, - }, nilShardingMetrics) + }, false, nilShardingMetrics) ctx := user.InjectOrgID(context.TODO(), "tenant") @@ -151,6 +429,39 @@ func Test_RangeVectorSplit(t *testing.T) { } } +// subQueryRequestResponse returns a RequestResponse containing the expected subQuery instant request +// and a response containing a sample value returned from the following wrapper +func subQueryRequestResponseWithQueryTime(expectedSubQuery string, sampleValue float64, exec time.Time) queryrangebase.RequestResponse { + return queryrangebase.RequestResponse{ + Request: &LokiInstantRequest{ + Query: expectedSubQuery, + TimeTs: exec, + Path: "/loki/api/v1/query", + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(expectedSubQuery), + }, + }, + Response: &LokiPromResponse{ + Response: &queryrangebase.PrometheusResponse{ + Status: loghttp.QueryStatusSuccess, + Data: queryrangebase.PrometheusData{ + ResultType: loghttp.ResultTypeVector, + Result: []queryrangebase.SampleStream{ + { + Labels: []logproto.LabelAdapter{ + {Name: "app", Value: "foo"}, + }, + Samples: []logproto.LegacySample{ + {TimestampMs: 1000, Value: sampleValue}, + }, + }, + }, + }, + }, + }, + } +} + // subQueryRequestResponse returns a RequestResponse containing the expected subQuery instant request // and a response containing a sample value returned from the following wrapper func subQueryRequestResponse(expectedSubQuery string, sampleValue float64) queryrangebase.RequestResponse { @@ -202,3 +513,20 @@ func expectedMergedResponse(expectedSampleValue float64) *queryrangebase.Prometh }, } } + +func expectedMergedResponseWithTime(expectedSampleValue float64, exec time.Time) *queryrangebase.PrometheusResponse { + return &queryrangebase.PrometheusResponse{ + Status: loghttp.QueryStatusSuccess, + Data: queryrangebase.PrometheusData{ + ResultType: loghttp.ResultTypeVector, + Result: []queryrangebase.SampleStream{ + { + Labels: []logproto.LabelAdapter{}, + Samples: []logproto.LegacySample{ + {TimestampMs: exec.UnixMilli(), Value: expectedSampleValue}, + }, + }, + }, + }, + } +} diff --git a/pkg/util/marshal/legacy/marshal_test.go b/pkg/util/marshal/legacy/marshal_test.go index 6e07d84615928..a3dca73ac299f 100644 --- a/pkg/util/marshal/legacy/marshal_test.go +++ b/pkg/util/marshal/legacy/marshal_test.go @@ -161,6 +161,16 @@ var queryTests = []struct { "downloadTime": 0, "queryLengthServed": 0 }, + "instantMetricResult": { + "entriesFound": 0, + "entriesRequested": 0, + "entriesStored": 0, + "bytesReceived": 0, + "bytesSent": 0, + "requests": 0, + "downloadTime": 0, + "queryLengthServed": 0 + }, "result": { "entriesFound": 0, "entriesRequested": 0, @@ -180,7 +190,7 @@ var queryTests = []struct { "shards": 0, "splits": 0, "subqueries": 0, - "totalBytesProcessed": 0, + "totalBytesProcessed": 0, "totalEntriesReturned": 0, "totalLinesProcessed": 0, "totalStructuredMetadataBytesProcessed": 0, diff --git a/pkg/util/marshal/marshal_test.go b/pkg/util/marshal/marshal_test.go index d5336298c37c8..ce7a49f97e76c 100644 --- a/pkg/util/marshal/marshal_test.go +++ b/pkg/util/marshal/marshal_test.go @@ -129,6 +129,16 @@ const emptyStats = `{ "downloadTime": 0, "queryLengthServed": 0 }, + "instantMetricResult": { + "entriesFound": 0, + "entriesRequested": 0, + "entriesStored": 0, + "bytesReceived": 0, + "bytesSent": 0, + "requests": 0, + "downloadTime": 0, + "queryLengthServed": 0 + }, "result": { "entriesFound": 0, "entriesRequested": 0, @@ -208,13 +218,13 @@ var queryTestWithEncodingFlags = []struct { [ "123456789012346", "super line with labels", { "structuredMetadata": { "foo": "a", - "bar": "b" - } + "bar": "b" + } }], [ "123456789012347", "super line with labels msg=text", { "structuredMetadata": { "foo": "a", - "bar": "b" + "bar": "b" }, "parsed": { "msg": "text" @@ -549,13 +559,13 @@ var tailTestWithEncodingFlags = []struct { [ "123456789012346", "super line with labels", { "structuredMetadata": { "foo": "a", - "bar": "b" - } + "bar": "b" + } }], [ "123456789012347", "super line with labels msg=text", { "structuredMetadata": { "foo": "a", - "bar": "b" + "bar": "b" }, "parsed": { "msg": "text" diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 00ee2e152144a..ab845380f9682 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -111,6 +111,7 @@ type Limits struct { MetadataQuerySplitDuration model.Duration `yaml:"split_metadata_queries_by_interval" json:"split_metadata_queries_by_interval"` RecentMetadataQuerySplitDuration model.Duration `yaml:"split_recent_metadata_queries_by_interval" json:"split_recent_metadata_queries_by_interval"` RecentMetadataQueryWindow model.Duration `yaml:"recent_metadata_query_window" json:"recent_metadata_query_window"` + InstantMetricQuerySplitDuration model.Duration `yaml:"split_instant_metric_queries_by_interval" json:"split_instant_metric_queries_by_interval"` IngesterQuerySplitDuration model.Duration `yaml:"split_ingester_queries_by_interval" json:"split_ingester_queries_by_interval"` MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"` MaxQueryBytesRead flagext.ByteSize `yaml:"max_query_bytes_read" json:"max_query_bytes_read"` @@ -307,6 +308,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.QuerySplitDuration.Set("1h") f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by a time interval and execute in parallel. The value 0 disables splitting by time. This also determines how cache keys are chosen when result caching is enabled.") + _ = l.InstantMetricQuerySplitDuration.Set("1h") + f.Var(&l.InstantMetricQuerySplitDuration, "querier.split-instant-metric-queries-by-interval", "Split instant metric queries by a time interval and execute in parallel. The value 0 disables splitting instant metric queries by time. This also determines how cache keys are chosen when instant metric query result caching is enabled.") _ = l.MetadataQuerySplitDuration.Set("24h") f.Var(&l.MetadataQuerySplitDuration, "querier.split-metadata-queries-by-interval", "Split metadata queries by a time interval and execute in parallel. The value 0 disables splitting metadata queries by time. This also determines how cache keys are chosen when label/series result caching is enabled.") @@ -601,6 +604,11 @@ func (o *Overrides) QuerySplitDuration(userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).QuerySplitDuration) } +// InstantMetricQuerySplitDuration returns the tenant specific instant metric queries splitby interval applied in the query frontend. +func (o *Overrides) InstantMetricQuerySplitDuration(userID string) time.Duration { + return time.Duration(o.getOverridesForUser(userID).InstantMetricQuerySplitDuration) +} + // MetadataQuerySplitDuration returns the tenant specific metadata splitby interval applied in the query frontend. func (o *Overrides) MetadataQuerySplitDuration(userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).MetadataQuerySplitDuration)