Skip to content

Commit

Permalink
fixing overrides
Browse files Browse the repository at this point in the history
Signed-off-by: Kaviraj <[email protected]>
  • Loading branch information
kavirajk committed Jan 29, 2024
1 parent 5ea7700 commit c39b68d
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 28 deletions.
4 changes: 3 additions & 1 deletion pkg/querier/queryrange/instant_metric_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/pkg/util"
)

type InstantMetricSplitter struct {
Expand Down Expand Up @@ -53,6 +54,7 @@ func NewInstantMetricCacheMiddleware(
merger queryrangebase.Merger,
c cache.Cache,
cacheGenNumberLoader queryrangebase.CacheGenNumberLoader,
iqo util.IngesterQueryOptions,
shouldCache queryrangebase.ShouldCacheFn,
parallelismForReq queryrangebase.ParallelismForReqFn,
retentionEnabled bool,
Expand All @@ -62,7 +64,7 @@ func NewInstantMetricCacheMiddleware(
return queryrangebase.NewResultsCacheMiddleware(
log,
c,
InstantMetricSplitter{cacheKeyLimits{limits, transformer}},
InstantMetricSplitter{cacheKeyLimits{limits, transformer, iqo}},
limits,
merger,
queryrangebase.PrometheusResponseExtractor{},
Expand Down
8 changes: 7 additions & 1 deletion pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func NewMiddleware(
}
}

if cfg.CacheInstantMetricResults {
instantMetricCache, err = newResultsCacheFromConfig(cfg.InstantMetricCacheConfig.ResultsCacheConfig, registerer, log, stats.InstantMetricResultsCache)
}

if cfg.CacheSeriesResults {
seriesCache, err = newResultsCacheFromConfig(cfg.SeriesCacheConfig.ResultsCacheConfig, registerer, log, stats.SeriesResultCache)
if err != nil {
Expand Down Expand Up @@ -216,7 +220,7 @@ func NewMiddleware(
return nil, nil, err
}

instantMetricTripperware, err := NewInstantMetricTripperware(cfg, engineOpts, log, limits, schema, metrics, codec, instantMetricCache, cacheGenNumLoader, retentionEnabled, indexStatsTripperware, metricsNamespace)
instantMetricTripperware, err := NewInstantMetricTripperware(cfg, engineOpts, log, limits, schema, metrics, codec, iqo, instantMetricCache, cacheGenNumLoader, retentionEnabled, indexStatsTripperware, metricsNamespace)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -775,6 +779,7 @@ func NewInstantMetricTripperware(
schema config.SchemaConfig,
metrics *Metrics,
merger base.Merger,
iqo util.IngesterQueryOptions,
c cache.Cache,
cacheGenNumLoader base.CacheGenNumberLoader,
retentionEnabled bool,
Expand All @@ -792,6 +797,7 @@ func NewInstantMetricTripperware(
merger,
c,
cacheGenNumLoader,
iqo,
func(_ context.Context, r base.Request) bool {
return !r.GetCachingOptions().Disabled
},
Expand Down
44 changes: 26 additions & 18 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1237,24 +1237,25 @@ func TestMetricsTripperware_SplitShardStats(t *testing.T) {
}

type fakeLimits struct {
maxQueryLength time.Duration
maxQueryParallelism int
tsdbMaxQueryParallelism int
maxQueryLookback time.Duration
maxEntriesLimitPerQuery int
maxSeries int
splitDuration map[string]time.Duration
metadataSplitDuration map[string]time.Duration
ingesterSplitDuration map[string]time.Duration
minShardingLookback time.Duration
queryTimeout time.Duration
requiredLabels []string
requiredNumberLabels int
maxQueryBytesRead int
maxQuerierBytesRead int
maxStatsCacheFreshness time.Duration
maxMetadataCacheFreshness time.Duration
volumeEnabled bool
maxQueryLength time.Duration
maxQueryParallelism int
tsdbMaxQueryParallelism int
maxQueryLookback time.Duration
maxEntriesLimitPerQuery int
maxSeries int
splitDuration map[string]time.Duration
metadataSplitDuration map[string]time.Duration
instantMetricSplitDuration map[string]time.Duration
ingesterSplitDuration map[string]time.Duration
minShardingLookback time.Duration
queryTimeout time.Duration
requiredLabels []string
requiredNumberLabels int
maxQueryBytesRead int
maxQuerierBytesRead int
maxStatsCacheFreshness time.Duration
maxMetadataCacheFreshness time.Duration
volumeEnabled bool
}

func (f fakeLimits) QuerySplitDuration(key string) time.Duration {
Expand All @@ -1264,6 +1265,13 @@ func (f fakeLimits) QuerySplitDuration(key string) time.Duration {
return f.splitDuration[key]
}

func (f fakeLimits) InstantMetricQuerySplitDuration(key string) time.Duration {
if f.instantMetricSplitDuration == nil {
return 0
}
return f.instantMetricSplitDuration[key]
}

func (f fakeLimits) MetadataQuerySplitDuration(key string) time.Duration {
if f.metadataSplitDuration == nil {
return 0
Expand Down
24 changes: 16 additions & 8 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,15 @@ type Limits struct {
QueryTimeout model.Duration `yaml:"query_timeout" json:"query_timeout"`

// Query frontend enforced limits. The default is actually parameterized by the queryrange config.
QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"`
MetadataQuerySplitDuration model.Duration `yaml:"split_metadata_queries_by_interval" json:"split_metadata_queries_by_interval"`
IngesterQuerySplitDuration model.Duration `yaml:"split_ingester_queries_by_interval" json:"split_ingester_queries_by_interval"`
MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"`
MaxQueryBytesRead flagext.ByteSize `yaml:"max_query_bytes_read" json:"max_query_bytes_read"`
MaxQuerierBytesRead flagext.ByteSize `yaml:"max_querier_bytes_read" json:"max_querier_bytes_read"`
VolumeEnabled bool `yaml:"volume_enabled" json:"volume_enabled" doc:"description=Enable log-volume endpoints."`
VolumeMaxSeries int `yaml:"volume_max_series" json:"volume_max_series" doc:"description=The maximum number of aggregated series in a log-volume response"`
QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"`
MetadataQuerySplitDuration model.Duration `yaml:"split_metadata_queries_by_interval" json:"split_metadata_queries_by_interval"`
InstantMetricQuerySplitDuration model.Duration `yaml:"split_instant_metric_queries_by_interval" json:"split_instant_metric_queries_by_interval"`
IngesterQuerySplitDuration model.Duration `yaml:"split_ingester_queries_by_interval" json:"split_ingester_queries_by_interval"`
MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"`
MaxQueryBytesRead flagext.ByteSize `yaml:"max_query_bytes_read" json:"max_query_bytes_read"`
MaxQuerierBytesRead flagext.ByteSize `yaml:"max_querier_bytes_read" json:"max_querier_bytes_read"`
VolumeEnabled bool `yaml:"volume_enabled" json:"volume_enabled" doc:"description=Enable log-volume endpoints."`
VolumeMaxSeries int `yaml:"volume_max_series" json:"volume_max_series" doc:"description=The maximum number of aggregated series in a log-volume response"`

// Ruler defaults and limits.
RulerMaxRulesPerRuleGroup int `yaml:"ruler_max_rules_per_rule_group" json:"ruler_max_rules_per_rule_group"`
Expand Down Expand Up @@ -303,6 +304,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {

_ = l.QuerySplitDuration.Set("1h")
f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by a time interval and execute in parallel. The value 0 disables splitting by time. This also determines how cache keys are chosen when result caching is enabled.")
_ = l.InstantMetricQuerySplitDuration.Set("5m")
f.Var(&l.InstantMetricQuerySplitDuration, "querier.split-instant-metric-queries-by-interval", "Split instant metric queries by a time interval and execute in parallel. The value 0 disables splitting instant metric queries by time. This also determines how cache keys are chosen when instant metric query result caching is enabled.")

// with metadata caching, it is not possible to extract a subset of labels/series from a cached extent because unlike samples they are not associated with a timestamp.
// as a result, we could return inaccurate results. example: returning results from an entire 1h extent for a 5m query
Expand Down Expand Up @@ -589,6 +592,11 @@ func (o *Overrides) QuerySplitDuration(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).QuerySplitDuration)
}

// InstantMetricQuerySplitDuration returns the tenant specific instant metric queries splitby interval applied in the query frontend.
func (o *Overrides) InstantMetricQuerySplitDuration(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).InstantMetricQuerySplitDuration)
}

// MetadataQuerySplitDuration returns the tenant specific metadata splitby interval applied in the query frontend.
func (o *Overrides) MetadataQuerySplitDuration(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).MetadataQuerySplitDuration)
Expand Down

0 comments on commit c39b68d

Please sign in to comment.