Skip to content

Commit

Permalink
Pushdown collection of results from generators in the querier (grafan…
Browse files Browse the repository at this point in the history
…a#4119)

* Pushdown collection of results from generators in the querier

* Update CHANGELOG.md
  • Loading branch information
electron0zero authored Oct 11, 2024
1 parent 5aef523 commit ce317a8
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 53 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [ENHANCEMENT] Pushdown collection of results from generators in the querier [#4119](https://github.com/grafana/tempo/pull/4119) (@electron0zero)
* [ENHANCEMENT] Send semver version in api/stattus/buildinfo for cloud deployments [#4110](https://github.com/grafana/tempo/pull/4110) [@Aki0x137]
* [ENHANCEMENT] Speedup tempo-query trace search by allowing parallel queries [#4159](https://github.com/grafana/tempo/pull/4159) (@pavolloffay)
* [ENHANCEMENT] Speedup DistinctString and ScopedDistinctString collectors [#4109](https://github.com/grafana/tempo/pull/4109) (@electron0zero)
Expand Down
73 changes: 34 additions & 39 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ var (
})
)

type (
forEachFn func(ctx context.Context, client tempopb.QuerierClient) error
forEachGeneratorFn func(ctx context.Context, client tempopb.MetricsGeneratorClient) error
replicationSetFn func(r ring.ReadRing) (ring.ReplicationSet, error)
)

// Querier handlers queries.
type Querier struct {
services.Service
Expand All @@ -83,11 +89,6 @@ type Querier struct {
subservicesWatcher *services.FailureWatcher
}

type responseFromGenerators struct {
addr string
response interface{}
}

// New makes a new Querier.
func New(
cfg Config,
Expand Down Expand Up @@ -322,11 +323,6 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
return resp, nil
}

type (
forEachFn func(ctx context.Context, client tempopb.QuerierClient) error
replicationSetFn func(r ring.ReadRing) (ring.ReplicationSet, error)
)

// forIngesterRings runs f, in parallel, for given ingesters
func (q *Querier) forIngesterRings(ctx context.Context, userID string, getReplicationSet replicationSetFn, f forEachFn) error {
if ctx.Err() != nil {
Expand Down Expand Up @@ -424,14 +420,10 @@ func forOneIngesterRing(ctx context.Context, replicationSet ring.ReplicationSet,
}

// forGivenGenerators runs f, in parallel, for given generators
func (q *Querier) forGivenGenerators(
ctx context.Context,
replicationSet ring.ReplicationSet,
f func(ctx context.Context, client tempopb.MetricsGeneratorClient) (interface{}, error),
) ([]responseFromGenerators, error) {
func (q *Querier) forGivenGenerators(ctx context.Context, replicationSet ring.ReplicationSet, f forEachGeneratorFn) error {
if ctx.Err() != nil {
_ = level.Debug(log.Logger).Log("foreGivenGenerators context error", "ctx.Err()", ctx.Err().Error())
return nil, ctx.Err()
return ctx.Err()
}

ctx, span := tracer.Start(ctx, "Querier.forGivenGenerators")
Expand All @@ -448,25 +440,25 @@ func (q *Querier) forGivenGenerators(
return nil, fmt.Errorf("failed to get client for %s: %w", generator.Addr, err)
}

resp, err := f(funcCtx, client.(tempopb.MetricsGeneratorClient))
err = f(funcCtx, client.(tempopb.MetricsGeneratorClient))
if err != nil {
return nil, fmt.Errorf("failed to execute f() for %s: %w", generator.Addr, err)
}

return responseFromGenerators{generator.Addr, resp}, nil
// we are returning the empty response here because response is collected by
// the collector inside forEachGeneratorFn
return nil, nil
}

results, err := replicationSet.Do(ctx, q.cfg.ExtraQueryDelay, doFunc)
// ignore response because it's nil, and we are using a collector inside forEachGeneratorFn to
// collect the actual response. we need to return nil here and ignore it
// because doFunc expects us to return a response
_, err := replicationSet.Do(ctx, q.cfg.ExtraQueryDelay, doFunc)
if err != nil {
return nil, fmt.Errorf("failed to get response from generators: %w", err)
}

responses := make([]responseFromGenerators, 0, len(results))
for _, result := range results {
responses = append(responses, result.(responseFromGenerators))
return fmt.Errorf("failed to get response from generators: %w", err)
}

return responses, nil
return nil
}

func (q *Querier) SearchRecent(ctx context.Context, req *tempopb.SearchRequest) (*tempopb.SearchResponse, error) {
Expand Down Expand Up @@ -739,23 +731,26 @@ func (q *Querier) SpanMetricsSummary(
if err != nil {
return nil, fmt.Errorf("error finding generators in Querier.SpanMetricsSummary: %w", err)
}
lookupResults, err := q.forGivenGenerators(
ctx,
replicationSet,
func(ctx context.Context, client tempopb.MetricsGeneratorClient) (interface{}, error) {
return client.GetMetrics(ctx, genReq)
},
)

var results []*tempopb.SpanMetricsResponse
mtx := sync.Mutex{}

forEach := func(ctx context.Context, client tempopb.MetricsGeneratorClient) error {
resp, err := client.GetMetrics(ctx, genReq)
if err != nil {
return err
}
// collect the results from the generators in the pool
mtx.Lock()
defer mtx.Unlock()
results = append(results, resp)
return nil
}
err = q.forGivenGenerators(ctx, replicationSet, forEach)
if err != nil {
return nil, fmt.Errorf("error querying generators in Querier.SpanMetricsSummary: %w", err)
}

// Assemble the results from the generators in the pool
results := make([]*tempopb.SpanMetricsResponse, 0, len(lookupResults))
for _, result := range lookupResults {
results = append(results, result.response.(*tempopb.SpanMetricsResponse))
}

// Combine the results
yyy := make(map[traceqlmetrics.MetricKeys]*traceqlmetrics.LatencyHistogram)
xxx := make(map[traceqlmetrics.MetricKeys]*tempopb.SpanMetricsSummary)
Expand Down
30 changes: 16 additions & 14 deletions modules/querier/querier_query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package querier
import (
"context"
"fmt"
"sync"
"time"

"github.com/go-kit/log/level"
Expand Down Expand Up @@ -30,26 +31,27 @@ func (q *Querier) queryRangeRecent(ctx context.Context, req *tempopb.QueryRangeR
if err != nil {
return nil, fmt.Errorf("error finding generators in Querier.queryRangeRecent: %w", err)
}
lookupResults, err := q.forGivenGenerators(
ctx,
replicationSet,
func(ctx context.Context, client tempopb.MetricsGeneratorClient) (interface{}, error) {
return client.QueryRange(ctx, req)
},
)
if err != nil {
_ = level.Error(log.Logger).Log("error querying generators in Querier.queryRangeRecent", "err", err)

return nil, fmt.Errorf("error querying generators in Querier.queryRangeRecent: %w", err)
}

c, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeSum, false)
if err != nil {
return nil, err
}

for _, result := range lookupResults {
c.Combine(result.response.(*tempopb.QueryRangeResponse))
mtx := sync.Mutex{} // combiner doesn't lock, so take lock before calling Combine to make is safe
forEach := func(ctx context.Context, client tempopb.MetricsGeneratorClient) error {
resp, err := client.QueryRange(ctx, req)
if err != nil {
return err
}
mtx.Lock()
defer mtx.Unlock()
c.Combine(resp)
return nil
}
err = q.forGivenGenerators(ctx, replicationSet, forEach)
if err != nil {
_ = level.Error(log.Logger).Log("error querying generators in Querier.queryRangeRecent", "err", err)
return nil, fmt.Errorf("error querying generators in Querier.queryRangeRecent: %w", err)
}

return c.Response(), nil
Expand Down

0 comments on commit ce317a8

Please sign in to comment.