Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support split align and caching for instant metric query results #11814

Merged
merged 33 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5ba4fa4
feat(caching): Support caching on instant metric queries results
kavirajk Jan 2, 2024
4d64df4
Merge branch 'main' into kavirajk/cache-instant-queries2
kavirajk Jan 29, 2024
5ea7700
integrate the basic middleware
kavirajk Jan 29, 2024
c39b68d
fixing overrides
kavirajk Jan 29, 2024
e4fbe8f
idk
kavirajk Jan 30, 2024
27dcfe6
Tweak sub queries without `offset` before caching
kavirajk Jan 30, 2024
57d77a9
test to assert offset removal
kavirajk Jan 30, 2024
08c5b4b
Fix timestamp adjustments
kavirajk Jan 31, 2024
bd558a1
missed error handling
kavirajk Feb 1, 2024
566bc4f
fix failing `TestMetricsTripperware_SplitShardStats` test
kavirajk Feb 1, 2024
e2e91b0
tweak downstreamer test
kavirajk Feb 2, 2024
d8ff56f
Fix split_by_range test cases for sub queries
kavirajk Feb 7, 2024
84bb4d4
fix `Downstream with offset removed` test case
kavirajk Feb 7, 2024
634d7f8
update stats
kavirajk Feb 7, 2024
d538f67
support split and align of instant subquery for cache reuse
kavirajk Feb 11, 2024
62cd346
Fix some bugs on split align and add more tests
kavirajk Feb 13, 2024
ae2e565
Merge branch 'main' into kavirajk/cache-instant-queries2
kavirajk Feb 13, 2024
597d40f
fix some build failures from merge with `main`
kavirajk Feb 13, 2024
fce06dc
PR remarks
kavirajk Feb 16, 2024
a6fe289
Merge branch 'main' into kavirajk/cache-instant-queries2
kavirajk Feb 16, 2024
998051a
fix additional arguments to results cache related to extent
kavirajk Feb 16, 2024
4db5398
`make doc`
kavirajk Feb 16, 2024
c307592
`make format`
kavirajk Feb 16, 2024
292f13a
PR remarks
kavirajk Feb 16, 2024
5cb63d4
add changelog entry
kavirajk Feb 16, 2024
e373341
remove unused ingester query options
kavirajk Feb 16, 2024
655844f
PR remarks
kavirajk Feb 19, 2024
ef9afeb
PR remarks and TODO to handle edge case
kavirajk Feb 19, 2024
a5ad611
PR remarks
kavirajk Feb 19, 2024
bbe5605
Merge branch 'main' into kavirajk/cache-instant-queries2
kavirajk Feb 19, 2024
38e71d6
Add cache hit log lines for instant metric query
kavirajk Feb 20, 2024
35f2c53
Merge branch 'main' into kavirajk/cache-instant-queries2
kavirajk Feb 20, 2024
3ff5150
fix breaking test cases.
kavirajk Feb 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

##### Enhancements

* [11814](https://github.com/grafana/loki/pull/11814) **kavirajk**: feat: Support split align and caching for instant metric query results
* [11851](https://github.com/grafana/loki/pull/11851) **elcomtik**: Helm: Allow the definition of resources for GrafanaAgent pods.
* [11819](https://github.com/grafana/loki/pull/11819) **jburnham**: Ruler: Add the ability to disable the `X-Scope-OrgId` tenant identification header in remote write requests.
* [11633](https://github.com/grafana/loki/pull/11633) **cyriltovena**: Add profiling integrations to tracing instrumentation.
Expand Down Expand Up @@ -69,7 +70,7 @@
* [11657](https://github.com/grafana/loki/pull/11657) **ashwanthgoli** Log results cache: compose empty response based on the request being served to avoid returning incorrect limit or direction.
* [11587](https://github.com/grafana/loki/pull/11587) **trevorwhitney** Fix semantics of label parsing logic of metrics and logs queries. Both only parse the first label if multiple extractions into the same label are requested.
* [11776](https://github.com/grafana/loki/pull/11776) **ashwanthgoli** Background Cache: Fixes a bug that is causing the background queue size to be incremented twice for each enqueued item.
* [11921](https://github.com/grafana/loki/pull/11921) **paul1r**: Parsing: String array elements were not being parsed correctly in JSON processing
* [11921](https://github.com/grafana/loki/pull/11921) **paul1r**: Parsing: String array elements were not being parsed correctly in JSON processing

##### Changes

Expand Down
30 changes: 30 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,28 @@ volume_results_cache:
# CLI flag: -frontend.volume-results-cache.compression
[compression: <string> | default = ""]

# Cache instant metric query results.
# CLI flag: -querier.cache-instant-metric-results
[cache_instant_metric_results: <boolean> | default = false]

# If a cache config is not specified and cache_instant_metric_results is true,
# the config for the results cache is used.
instant_metric_results_cache:
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
# The cache block configures the cache backend.
# The CLI flags prefix for this block configuration is:
# frontend.instant-metric-results-cache
[cache: <cache_config>]

# Use compression in cache. The default is an empty value '', which disables
# compression. Supported values are: 'snappy' and ''.
# CLI flag: -frontend.instant-metric-results-cache.compression
[compression: <string> | default = ""]
dannykopping marked this conversation as resolved.
Show resolved Hide resolved

# Whether to align the splits of instant metric query with splitByInterval and
# query's exec time. Useful when instant_metric_cache is enabled
# CLI flag: -querier.instant-metric-query-split-align
[instant_metric_query_split_align: <boolean> | default = false]

# Cache series query results.
# CLI flag: -querier.cache-series-results
[cache_series_results: <boolean> | default = false]
Expand Down Expand Up @@ -2935,6 +2957,13 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -experimental.querier.recent-metadata-query-window
[recent_metadata_query_window: <duration> | default = 0s]

# Split instant metric queries by a time interval and execute in parallel. The
# value 0 disables splitting instant metric queries by time. This also
# determines how cache keys are chosen when instant metric query result caching
# is enabled.
# CLI flag: -querier.split-instant-metric-queries-by-interval
[split_instant_metric_queries_by_interval: <duration> | default = 1h]

# Interval to use for time-based splitting when a request is within the
# `query_ingesters_within` window; defaults to `split-queries-by-interval` by
# setting to 0.
Expand Down Expand Up @@ -4403,6 +4432,7 @@ The cache block configures the cache backend. The supported CLI flags `<prefix>`
- `bloom.metas-cache`
- `frontend`
- `frontend.index-stats-results-cache`
- `frontend.instant-metric-results-cache`
- `frontend.label-results-cache`
- `frontend.series-results-cache`
- `frontend.volume-results-cache`
Expand Down
4 changes: 4 additions & 0 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,10 @@ func NewResultStepEvaluator(res logqlmodel.Result, params Params) (StepEvaluator
step = params.Step()
)

if res.Data == nil {
return nil, fmt.Errorf("data in the passed result is nil (res.Data), cannot be processed by stepevaluator")
}

switch data := res.Data.(type) {
case promql.Vector:
return NewVectorStepEvaluator(start, data), nil
Expand Down
83 changes: 83 additions & 0 deletions pkg/logql/rangemapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@ type RangeMapper struct {
splitByInterval time.Duration
metrics *MapperMetrics
stats *MapperStats

// SplitAlign
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
splitAlignTs time.Time
}

func NewRangeMapperWithSplitAlign(interval time.Duration, splitAlign time.Time, metrics *MapperMetrics, stats *MapperStats) (RangeMapper, error) {
rm, err := NewRangeMapper(interval, metrics, stats)
if err != nil {
return RangeMapper{}, err
}
rm.splitAlignTs = splitAlign

return rm, nil
}

// NewRangeMapper creates a new RangeMapper instance with the given duration as
Expand Down Expand Up @@ -327,6 +340,76 @@ func (m RangeMapper) getOriginalOffset(expr syntax.SampleExpr) (offset time.Dura
// rangeInterval should be greater than m.splitByInterval, otherwise the resultant expression
// will have an unnecessary aggregation operation
func (m RangeMapper) mapConcatSampleExpr(expr syntax.SampleExpr, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr {
if m.splitAlignTs.IsZero() {
return m.rangeSplit(expr, rangeInterval, recorder)
}
return m.rangeSplitAlign(expr, rangeInterval, recorder)
}

// rangeSplitAlign try to split given `rangeInterval` into units of `m.splitByInterval` by making sure `rangeInterval` is aligned with `m.splitByInterval` for as much as the units as possible.
// Consider following example with real use case.
// Instant Query: `sum(rate({foo="bar"}[3h])`
// execTs: 12:34:00
// splitBy: 1h
// Given above parameters, queries will be split into following
// 1. sum(rate({foo="bar"}[34m]))
// 2. sum(rate({foo="bar"}[1h] offset 34m))
// 3. sum(rate({foo="bar"}[1h] offset 1h34m))
// 4. sum(rate({foo="bar"}[26m] offset 2h34m))
// All with execTs: 12:34:00. Here (2) and (3) range values (1h) aligned with splitByInteval (1h).
func (m RangeMapper) rangeSplitAlign(
expr syntax.SampleExpr, rangeInterval time.Duration, recorder *downstreamRecorder,
) syntax.SampleExpr {
if rangeInterval <= m.splitByInterval {
return expr
}

originalOffset, err := m.getOriginalOffset(expr)
if err != nil {
return expr
}

align := m.splitAlignTs.Sub(m.splitAlignTs.Truncate(m.splitByInterval)) // say, 12:34:00 - 12:00:00(truncated) = 34m

if align == 0 {
return m.rangeSplit(expr, rangeInterval, recorder) // Don't have to align
}

var (
newRng = align
newOffset = originalOffset
kavirajk marked this conversation as resolved.
Show resolved Hide resolved
downstreams *ConcatSampleExpr
pendingRangeInterval = rangeInterval
splits = 0
)

// first subquery
downstreams = appendDownstream(downstreams, expr, newRng, newOffset)
splits++

newOffset += align // e.g: offset 34m
pendingRangeInterval -= align
newRng = m.splitByInterval // [1h]

// Rest of the subqueries.
for pendingRangeInterval > 0 {
if pendingRangeInterval < m.splitByInterval {
newRng = pendingRangeInterval // last subquery
}
downstreams = appendDownstream(downstreams, expr, newRng, newOffset)
newOffset += m.splitByInterval
pendingRangeInterval -= m.splitByInterval
kavirajk marked this conversation as resolved.
Show resolved Hide resolved
splits++
}

// update stats and metrics
m.stats.AddSplitQueries(splits)
recorder.Add(splits, MetricsKey)

return downstreams
}

func (m RangeMapper) rangeSplit(expr syntax.SampleExpr, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr {
splitCount := int(math.Ceil(float64(rangeInterval) / float64(m.splitByInterval)))
if splitCount <= 1 {
return expr
Expand Down
84 changes: 81 additions & 3 deletions pkg/logql/rangemapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,84 @@ func Test_SplitRangeInterval(t *testing.T) {
}
}

func Test_RangeMapperSplitAlign(t *testing.T) {
cases := []struct {
name string
expr string
queryTime time.Time
splityByInterval time.Duration
expected string
expectedSplits int
}{
{
name: "query_time_aligned_with_split_by",
expr: `bytes_over_time({app="foo"}[3m])`,
expected: `sum without() (
downstream<bytes_over_time({app="foo"}[1m] offset 2m0s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1m] offset 1m0s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1m]), shard=<nil>>
)`,
queryTime: time.Unix(60, 0), // 1970 00:01:00
splityByInterval: 1 * time.Minute,
expectedSplits: 3,
},
{
name: "query_time_aligned_with_split_by_with_original_offset",
expr: `bytes_over_time({app="foo"}[3m] offset 20m10s)`, // NOTE: original query has offset, which should be considered in all the splits subquery
expected: `sum without() (
downstream<bytes_over_time({app="foo"}[1m] offset 22m10s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1m] offset 21m10s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1m] offset 20m10s), shard=<nil>>
)`,
queryTime: time.Unix(60, 0), // 1970 00:01:00
splityByInterval: 1 * time.Minute,
expectedSplits: 3,
},
{
name: "query_time_not_aligned_with_split_by",
expr: `bytes_over_time({app="foo"}[3h])`,
expected: `sum without() (
downstream<bytes_over_time({app="foo"}[6m] offset 2h54m0s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1h] offset 1h54m0s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1h] offset 54m0s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[54m]), shard=<nil>>
)`,
queryTime: time.Date(0, 0, 0, 12, 54, 0, 0, time.UTC), // 1970 12:54:00
splityByInterval: 1 * time.Hour,
expectedSplits: 4,
},
{
name: "query_time_not_aligned_with_split_by_with_original_offset",
expr: `bytes_over_time({app="foo"}[3h] offset 1h2m20s)`, // NOTE: original query has offset, which should be considered in all the splits subquery
expected: `sum without() (
downstream<bytes_over_time({app="foo"}[6m] offset 3h56m20s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1h] offset 2h56m20s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1h] offset 1h56m20s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[54m] offset 1h2m20s), shard=<nil>>
)`,
queryTime: time.Date(0, 0, 0, 12, 54, 0, 0, time.UTC), // 1970 12:54:00
splityByInterval: 1 * time.Hour,
expectedSplits: 4,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
mapperStats := NewMapperStats()
rvm, err := NewRangeMapperWithSplitAlign(tc.splityByInterval, tc.queryTime, nilShardMetrics, mapperStats)
require.NoError(t, err)

noop, mappedExpr, err := rvm.Parse(syntax.MustParseExpr(tc.expr))
require.NoError(t, err)

require.Equal(t, removeWhiteSpace(tc.expected), removeWhiteSpace(mappedExpr.String()))
require.Equal(t, tc.expectedSplits, mapperStats.GetSplitQueries())
require.False(t, noop)

})
}
}

func Test_SplitRangeVectorMapping(t *testing.T) {
for _, tc := range []struct {
expr string
Expand Down Expand Up @@ -1675,7 +1753,7 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
// Non-splittable vector aggregators - should go deeper in the AST
{
`topk(2, count_over_time({app="foo"}[3m]))`,
`topk(2,
`topk(2,
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
sum without () (
downstream<count_over_time({app="foo"}[1m] offset 2m0s), shard=<nil>>
++ downstream<count_over_time({app="foo"}[1m] offset 1m0s), shard=<nil>>
Expand Down Expand Up @@ -1713,7 +1791,7 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
++ downstream<sum by (baz) (count_over_time({app="foo"} [1m] offset 1m0s)), shard=<nil>>
++ downstream<sum by (baz) (count_over_time({app="foo"} [1m])), shard=<nil>>
)
),
),
"x", "$1", "a", "(.*)"
)`,
3,
Expand All @@ -1727,7 +1805,7 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
++ downstream<count_over_time({job="api-server",service="a:c"} |= "err" [1m] offset 1m0s), shard=<nil>>
++ downstream<count_over_time({job="api-server",service="a:c"} |= "err" [1m]), shard=<nil>>
)
/ 180),
/ 180),
"foo", "$1", "service", "(.*):.*"
)`,
3,
Expand Down
49 changes: 30 additions & 19 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,18 @@ type Context struct {
type CacheType string

const (
ChunkCache CacheType = "chunk" //nolint:staticcheck
IndexCache CacheType = "index" //nolint:staticcheck
ResultCache CacheType = "result" //nolint:staticcheck
StatsResultCache CacheType = "stats-result" //nolint:staticcheck
VolumeResultCache CacheType = "volume-result" //nolint:staticcheck
WriteDedupeCache CacheType = "write-dedupe" //nolint:staticcheck
SeriesResultCache CacheType = "series-result" //nolint:staticcheck
LabelResultCache CacheType = "label-result" //nolint:staticcheck
BloomFilterCache CacheType = "bloom-filter" //nolint:staticcheck
BloomBlocksCache CacheType = "bloom-blocks" //nolint:staticcheck
BloomMetasCache CacheType = "bloom-metas" //nolint:staticcheck
ChunkCache CacheType = "chunk" //nolint:staticcheck
IndexCache CacheType = "index" //nolint:staticcheck
ResultCache CacheType = "result" //nolint:staticcheck
StatsResultCache CacheType = "stats-result" //nolint:staticcheck
VolumeResultCache CacheType = "volume-result" //nolint:staticcheck
InstantMetricResultsCache CacheType = "instant-metric-result" // nolint:staticcheck
WriteDedupeCache CacheType = "write-dedupe" //nolint:staticcheck
SeriesResultCache CacheType = "series-result" //nolint:staticcheck
LabelResultCache CacheType = "label-result" //nolint:staticcheck
BloomFilterCache CacheType = "bloom-filter" //nolint:staticcheck
BloomBlocksCache CacheType = "bloom-blocks" //nolint:staticcheck
BloomMetasCache CacheType = "bloom-metas" //nolint:staticcheck
)

// NewContext creates a new statistics context
Expand Down Expand Up @@ -98,13 +99,14 @@ func (c *Context) Ingester() Ingester {
// Caches returns the cache statistics accumulated so far.
func (c *Context) Caches() Caches {
return Caches{
Chunk: c.caches.Chunk,
Index: c.caches.Index,
Result: c.caches.Result,
StatsResult: c.caches.StatsResult,
VolumeResult: c.caches.VolumeResult,
SeriesResult: c.caches.SeriesResult,
LabelResult: c.caches.LabelResult,
Chunk: c.caches.Chunk,
Index: c.caches.Index,
Result: c.caches.Result,
StatsResult: c.caches.StatsResult,
VolumeResult: c.caches.VolumeResult,
SeriesResult: c.caches.SeriesResult,
LabelResult: c.caches.LabelResult,
InstantMetricResult: c.caches.InstantMetricResult,
}
}

Expand Down Expand Up @@ -222,6 +224,7 @@ func (c *Caches) Merge(m Caches) {
c.VolumeResult.Merge(m.VolumeResult)
c.SeriesResult.Merge(m.SeriesResult)
c.LabelResult.Merge(m.LabelResult)
c.InstantMetricResult.Merge(m.InstantMetricResult)
}

func (c *Cache) Merge(m Cache) {
Expand Down Expand Up @@ -470,6 +473,8 @@ func (c *Context) getCacheStatsByType(t CacheType) *Cache {
stats = &c.caches.SeriesResult
case LabelResultCache:
stats = &c.caches.LabelResult
case InstantMetricResultsCache:
stats = &c.caches.InstantMetricResult
default:
return nil
}
Expand Down Expand Up @@ -571,6 +576,12 @@ func (c Caches) Log(log log.Logger) {
"Cache.Result.EntriesStored", c.Result.EntriesStored,
"Cache.Result.BytesSent", humanize.Bytes(uint64(c.Result.BytesSent)),
"Cache.Result.BytesReceived", humanize.Bytes(uint64(c.Result.BytesReceived)),
"Cache.Result.DownloadTime", c.Result.CacheDownloadTime(),
"Cache.InstantMetricResult.Requests", c.InstantMetricResult.Requests,
"Cache.InstantMetricResult.EntriesRequested", c.InstantMetricResult.EntriesRequested,
"Cache.InstantMetricResult.EntriesFound", c.InstantMetricResult.EntriesFound,
"Cache.InstantMetricResult.EntriesStored", c.InstantMetricResult.EntriesStored,
"Cache.InstantMetricResult.BytesSent", humanize.Bytes(uint64(c.InstantMetricResult.BytesSent)),
"Cache.InstantMetricResult.BytesReceived", humanize.Bytes(uint64(c.InstantMetricResult.BytesReceived)),
"Cache.InstantMetricResult.DownloadTime", c.InstantMetricResult.CacheDownloadTime(),
)
}
Loading
Loading