Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

thanos-query-frontend: Enable Thanos Query Stats Propagation & cache response headers #10

Open
wants to merge 1 commit into
base: monzo-master-v0.35.0-rc-0.65
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ func registerQueryFrontend(app *extkingpin.App) {

cmd.Flag("query-frontend.log-queries-longer-than", "Log queries that are slower than the specified duration. "+
"Set to 0 to disable. Set to < 0 to enable on all queries.").Default("0").DurationVar(&cfg.CortexHandlerConfig.LogQueriesLongerThan)
cmd.Flag("query-frontend.query-stats-enabled", "True to enable query statistics tracking. "+
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed this now that we return query stats directly in the response

"When enabled, a message with some statistics is logged for every query.").Default("false").BoolVar(&cfg.CortexHandlerConfig.QueryStatsEnabled)

cmd.Flag("query-frontend.org-id-header", "Deprecation Warning - This flag will be soon deprecated in favor of query-frontend.tenant-header"+
" and both flags cannot be used at the same time. "+
Expand Down Expand Up @@ -313,7 +311,7 @@ func runQueryFrontend(
return err
}

roundTripper, err := cortexfrontend.NewDownstreamRoundTripper(cfg.DownstreamURL, downstreamTripper, cfg.CortexHandlerConfig.QueryStatsEnabled)
roundTripper, err := cortexfrontend.NewDownstreamRoundTripper(cfg.DownstreamURL, downstreamTripper)
if err != nil {
return errors.Wrap(err, "setup downstream roundtripper")
}
Expand Down
16 changes: 4 additions & 12 deletions internal/cortex/frontend/downstream_roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@ import (

// RoundTripper that forwards requests to downstream URL.
type downstreamRoundTripper struct {
downstreamURL *url.URL
transport http.RoundTripper
queryStatsEnabled bool
downstreamURL *url.URL
transport http.RoundTripper
}

func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper, queryStatsEnabled bool) (http.RoundTripper, error) {
func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper) (http.RoundTripper, error) {
u, err := url.Parse(downstreamURL)
if err != nil {
return nil, err
}

return &downstreamRoundTripper{downstreamURL: u, transport: transport, queryStatsEnabled: queryStatsEnabled}, nil
return &downstreamRoundTripper{downstreamURL: u, transport: transport}, nil
}

func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
Expand All @@ -37,13 +36,6 @@ func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, erro
}
}

if d.queryStatsEnabled {
// add &stats query param to get thanos-query to add query statistics to log
q := r.URL.Query()
q.Set("stats", "true")
r.URL.RawQuery = q.Encode()
}

r.URL.Scheme = d.downstreamURL.Scheme
r.URL.Host = d.downstreamURL.Host
r.URL.Path = path.Join(d.downstreamURL.Path, r.URL.Path)
Expand Down
90 changes: 1 addition & 89 deletions internal/cortex/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ package transport
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/util/stats"
"io"
"net/http"
Expand Down Expand Up @@ -44,7 +42,6 @@ var (
type HandlerConfig struct {
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
MaxBodySize int64 `yaml:"max_body_size"`
QueryStatsEnabled bool `yaml:"query_stats_enabled"`
}

// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
Expand All @@ -68,27 +65,6 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
roundTripper: roundTripper,
}

if cfg.QueryStatsEnabled {
h.querySeconds = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_query_frontend_query_seconds",
Help: "Total amount of wall clock time spend processing queries.",
Buckets: []float64{0.01, 0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 360},
}, []string{"user"})

h.querySamplesTotal = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_query_frontend_query_total_fetched_samples",
Help: "Number of samples touched to execute a query.",
Buckets: []float64{1, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000},
}, []string{"user"})

h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
h.querySeconds.DeleteLabelValues(user)
h.querySamplesTotal.DeleteLabelValues(user)
})
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
_ = h.activeUsers.StartAsync(context.Background())
}

return h
}

Expand Down Expand Up @@ -129,38 +105,15 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

w.WriteHeader(resp.StatusCode)

var respBuf bytes.Buffer
if f.cfg.QueryStatsEnabled {
// Buffer the response body for query stat tracking later
resp.Body = io.NopCloser(io.TeeReader(resp.Body, &respBuf))
}

// log copy response body error so that we will know even though success response code returned
bytesCopied, err := io.Copy(w, resp.Body)
if err != nil && !errors.Is(err, syscall.EPIPE) {
level.Error(util_log.WithContext(r.Context(), f.log)).Log("msg", "write response body error", "bytesCopied", bytesCopied, "err", err)
}

if f.cfg.QueryStatsEnabled {
// Parse the stats field out of the response body
var statsResponse ResponseWithStats
if err := json.Unmarshal(respBuf.Bytes(), &statsResponse); err == nil {
if statsResponse.Data.Stats != nil {
queryString = f.parseRequestQueryString(r, buf)
f.reportQueryStats(r, queryString, queryResponseTime, statsResponse.Data.Stats)
} else {
// Don't fail the request if the stats are nil, just log a warning
level.Warn(util_log.WithContext(r.Context(), f.log)).Log("msg", "error parsing query stats", "err", errors.New("stats are nil"))
}
} else {
// Don't fail the request if the stats are nil, just log a warning
level.Warn(util_log.WithContext(r.Context(), f.log)).Log("msg", "error parsing query stats", "err", err)
}
}

// Check whether we should parse the query string.
shouldReportSlowQuery := f.cfg.LogQueriesLongerThan != 0 && queryResponseTime > f.cfg.LogQueriesLongerThan
if shouldReportSlowQuery || f.cfg.QueryStatsEnabled {
if shouldReportSlowQuery {
queryString = f.parseRequestQueryString(r, buf)
}

Expand Down Expand Up @@ -203,47 +156,6 @@ func (f *Handler) reportSlowQuery(r *http.Request, responseHeaders http.Header,
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}

func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *stats.BuiltinStats) {
remoteUser, _, _ := r.BasicAuth()

// Log stats.
logMessage := []interface{}{
"msg", "query stats",
"component", "query-frontend",
"method", r.Method,
"path", r.URL.Path,
"remote_user", remoteUser,
"remote_addr", r.RemoteAddr,
"response_time", queryResponseTime,
"query_timings_preparation_time", stats.Timings.QueryPreparationTime,
"query_timings_eval_total_time", stats.Timings.EvalTotalTime,
"query_timings_exec_total_time", stats.Timings.ExecTotalTime,
"query_timings_exec_queue_time", stats.Timings.ExecQueueTime,
"query_timings_inner_eval_time", stats.Timings.InnerEvalTime,
"query_timings_result_sort_time", stats.Timings.ResultSortTime,
}
if stats.Samples != nil {
samples := stats.Samples

logMessage = append(logMessage, []interface{}{
"total_queryable_samples", samples.TotalQueryableSamples,
"peak_samples", samples.PeakSamples,
}...)
}

logMessage = append(logMessage, formatQueryString(queryString)...)

level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)

// Record metrics.
if f.querySeconds != nil {
f.querySeconds.WithLabelValues(remoteUser).Observe(queryResponseTime.Seconds())
}
if f.querySamplesTotal != nil && stats.Samples != nil {
f.querySamplesTotal.WithLabelValues(remoteUser).Observe(float64(stats.Samples.TotalQueryableSamples))
}
}

func (f *Handler) parseRequestQueryString(r *http.Request, bodyBuf bytes.Buffer) url.Values {
// Use previously buffered body.
r.Body = io.NopCloser(&bodyBuf)
Expand Down
1 change: 0 additions & 1 deletion internal/cortex/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type Config struct {
QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"`
QueryStoreForLabels bool `yaml:"query_store_for_labels_enabled"`
AtModifierEnabled bool `yaml:"at_modifier_enabled"`
EnablePerStepStats bool `yaml:"per_step_stats_enabled"`

// QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters.
QueryStoreAfter time.Duration `yaml:"query_store_after"`
Expand Down
76 changes: 72 additions & 4 deletions internal/cortex/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type Response interface {
GetHeaders() []*PrometheusResponseHeader
// GetStats returns the Prometheus query stats in the response.
GetStats() *PrometheusResponseStats
// AddHeader adds a HTTP header to the response
AddHeader(key, value string)
}

type prometheusCodec struct{}
Expand Down Expand Up @@ -184,6 +186,14 @@ func (resp *PrometheusInstantQueryResponse) GetStats() *PrometheusResponseStats
return resp.Data.Stats
}

func (resp *PrometheusResponse) AddHeader(key, value string) {
resp.Headers = append(resp.Headers, &PrometheusResponseHeader{Name: key, Values: []string{value}})
}

func (resp *PrometheusInstantQueryResponse) AddHeader(key, value string) {
resp.Headers = append(resp.Headers, &PrometheusResponseHeader{Name: key, Values: []string{value}})
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll end up with duplicate headers if the same header key is added twice - should this check for the header already existing and either replace it or append to its values?

// NewEmptyPrometheusResponse returns an empty successful Prometheus query range response.
func NewEmptyPrometheusResponse() *PrometheusResponse {
return &PrometheusResponse{
Expand Down Expand Up @@ -217,9 +227,25 @@ func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response
// we need to pass on all the headers for results cache gen numbers.
var resultsCacheGenNumberHeaderValues []string

var headers []*PrometheusResponseHeader
// merge headers
for _, res := range responses {
promResponses = append(promResponses, res.(*PrometheusResponse))
resultsCacheGenNumberHeaderValues = append(resultsCacheGenNumberHeaderValues, getHeaderValuesWithName(res, ResultsCacheGenNumberHeaderName)...)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt this matters given what was happening before, but this could be out of sync with the actual headers we return in the response due to the merging below.

for _, newHeader := range res.GetHeaders() {
found := false
for _, existing := range headers {
if existing.Name == newHeader.Name {
// if headers match, overwrite with the new header
existing = newHeader
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be *existing = *newHeader ?

Also, below we sort the responses by response time and then the first non-null explanation is taken rather than the last one. Is consistency between these important?

found = true
}
}
if !found {
// new header doesn't exist in existing list, so add it
headers = append(headers, newHeader)
}
}
}

// Merge the responses.
Expand All @@ -244,10 +270,14 @@ func (prometheusCodec) MergeResponse(_ Request, responses ...Response) (Response
}

if len(resultsCacheGenNumberHeaderValues) != 0 {
response.Headers = []*PrometheusResponseHeader{{
response.Headers = append(response.Headers, &PrometheusResponseHeader{
Name: ResultsCacheGenNumberHeaderName,
Values: resultsCacheGenNumberHeaderValues,
}}
})
}

for _, h := range headers {
response.Headers = append(response.Headers, h)
}

return &response, nil
Expand Down Expand Up @@ -419,6 +449,16 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.
StatusCode: http.StatusOK,
ContentLength: int64(len(b)),
}

// Copy Prometheus headers into http response
for _, h := range a.Headers {
for _, v := range h.Values {
if strings.HasPrefix(h.Name, "X-") {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the headers always upper case?

// Only copy X- headers, otherwise we might corrupt HTTP protocol headers (like Content-Length)
resp.Header.Add(h.Name, v)
}
}
}
return &resp, nil
}

Expand Down Expand Up @@ -665,6 +705,23 @@ func (s *PrometheusInstantQueryData) MarshalJSON() ([]byte, error) {
func StatsMerge(resps []Response) *PrometheusResponseStats {
output := map[int64]*PrometheusResponseQueryableSamplesStatsPerStep{}
hasStats := false

result := &PrometheusResponseStats{
Timings: &PrometheusResponseStats_Timings{
EvalTotalTime: 0,
ResultSortTime: 0,
QueryPreparationTime: 0,
InnerEvalTime: 0,
ExecQueueTime: 0,
ExecTotalTime: 0,
},
Samples: &PrometheusResponseStats_Samples{
TotalQueryableSamples: 0,
PeakSamples: 0,
TotalQueryableSamplesPerStep: []*PrometheusResponseQueryableSamplesStatsPerStep{},
},
}

for _, resp := range resps {
stats := resp.GetStats()
if stats == nil {
Expand All @@ -679,6 +736,19 @@ func StatsMerge(resps []Response) *PrometheusResponseStats {
for _, s := range stats.Samples.TotalQueryableSamplesPerStep {
output[s.GetTimestampMs()] = s
}

// add all the stats
result.Timings.EvalTotalTime += stats.Timings.EvalTotalTime
result.Timings.ResultSortTime += stats.Timings.ResultSortTime
result.Timings.QueryPreparationTime += stats.Timings.QueryPreparationTime
result.Timings.InnerEvalTime += stats.Timings.InnerEvalTime
result.Timings.ExecQueueTime += stats.Timings.ExecQueueTime
result.Timings.ExecTotalTime += stats.Timings.ExecTotalTime

result.Samples.TotalQueryableSamples += stats.Samples.TotalQueryableSamples
if stats.Samples.PeakSamples > result.Samples.PeakSamples {
result.Samples.PeakSamples = stats.Samples.PeakSamples
}
}

if !hasStats {
Expand All @@ -692,10 +762,8 @@ func StatsMerge(resps []Response) *PrometheusResponseStats {

sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })

result := &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{}}
for _, key := range keys {
result.Samples.TotalQueryableSamplesPerStep = append(result.Samples.TotalQueryableSamplesPerStep, output[key])
result.Samples.TotalQueryableSamples += output[key].Value
}

return result
Expand Down
Loading
Loading