Skip to content

Commit

Permalink
feat: Use context propagation to call the same ingester in GetChunksI…
Browse files Browse the repository at this point in the history
…D as Query (#15186)

Co-authored-by: Ben Clive <[email protected]>
  • Loading branch information
cyriltovena and benclive authored Dec 4, 2024
1 parent 532bdbc commit 70d9587
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 42 deletions.
137 changes: 108 additions & 29 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"slices"
"strings"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -82,10 +83,91 @@ func newIngesterQuerier(querierConfig Config, clientCfg client.Config, ring ring
return &iq, nil
}

type ctxKeyType string

const (
partitionCtxKey ctxKeyType = "partitionCtx"
)

type PartitionContext struct {
isPartitioned bool
ingestersUsed map[string]PartitionIngesterUsed
mtx sync.Mutex
}

type PartitionIngesterUsed struct {
client logproto.QuerierClient
addr string
}

func (p *PartitionContext) AddClient(client logproto.QuerierClient, addr string) {
p.mtx.Lock()
defer p.mtx.Unlock()
if !p.isPartitioned {
return
}
p.ingestersUsed[addr] = PartitionIngesterUsed{client: client, addr: addr}
}

func (p *PartitionContext) RemoveClient(addr string) {
p.mtx.Lock()
defer p.mtx.Unlock()
if !p.isPartitioned {
return
}
delete(p.ingestersUsed, addr)
}

func (p *PartitionContext) SetIsPartitioned(isPartitioned bool) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.isPartitioned = isPartitioned
}

func (p *PartitionContext) IsPartitioned() bool {
return p.isPartitioned
}

func (p *PartitionContext) forQueriedIngesters(ctx context.Context, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
p.mtx.Lock()
defer p.mtx.Unlock()

ingestersUsed := make([]PartitionIngesterUsed, 0, len(p.ingestersUsed))
for _, ingester := range p.ingestersUsed {
ingestersUsed = append(ingestersUsed, ingester)
}

return concurrency.ForEachJobMergeResults(ctx, ingestersUsed, 0, func(ctx context.Context, job PartitionIngesterUsed) ([]responseFromIngesters, error) {
resp, err := f(ctx, job.client)
if err != nil {
return nil, err
}
return []responseFromIngesters{{addr: job.addr, response: resp}}, nil
})
}

// NewPartitionContext creates a new partition context
// This is used to track which ingesters were used in the query and reuse the same ingesters for consecutive queries
func NewPartitionContext(ctx context.Context) context.Context {
return context.WithValue(ctx, partitionCtxKey, &PartitionContext{
ingestersUsed: make(map[string]PartitionIngesterUsed),
})
}

func ExtractPartitionContext(ctx context.Context) *PartitionContext {
v, ok := ctx.Value(partitionCtxKey).(*PartitionContext)
if !ok {
return &PartitionContext{
ingestersUsed: make(map[string]PartitionIngesterUsed),
}
}
return v
}

// forAllIngesters runs f, in parallel, for all ingesters
// waitForAllResponses param can be used to require results from all ingesters in the replication set. If this is set to false, the call will return as soon as we have a quorum by zone. Only valid for partition-ingesters.
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, waitForAllResponses bool, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
if q.querierConfig.QueryPartitionIngesters {
ExtractPartitionContext(ctx).SetIsPartitioned(true)
tenantID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
Expand All @@ -99,7 +181,7 @@ func (q *IngesterQuerier) forAllIngesters(ctx context.Context, waitForAllRespons
if err != nil {
return nil, err
}
return q.forGivenIngesterSets(ctx, waitForAllResponses, replicationSets, f)
return q.forGivenIngesterSets(ctx, replicationSets, f)
}

replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
Expand All @@ -111,19 +193,13 @@ func (q *IngesterQuerier) forAllIngesters(ctx context.Context, waitForAllRespons
}

// forGivenIngesterSets runs f, in parallel, for given ingester sets
// waitForAllResponses param can be used to require results from all ingesters in all replication sets. If this is set to false, the call will return as soon as we have a quorum by zone.
func (q *IngesterQuerier) forGivenIngesterSets(ctx context.Context, waitForAllResponses bool, replicationSet []ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
func (q *IngesterQuerier) forGivenIngesterSets(ctx context.Context, replicationSet []ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
// Enable minimize requests if we can, so we initially query a single ingester per replication set, as each replication-set is one partition.
// Ingesters must supply zone information for this to have an effect.
config := ring.DoUntilQuorumConfig{
MinimizeRequests: !waitForAllResponses,
MinimizeRequests: true,
}
return concurrency.ForEachJobMergeResults[ring.ReplicationSet, responseFromIngesters](ctx, replicationSet, 0, func(ctx context.Context, set ring.ReplicationSet) ([]responseFromIngesters, error) {
if waitForAllResponses {
// Tell the ring we need to return all responses from all zones
set.MaxErrors = 0
set.MaxUnavailableZones = 0
}
return q.forGivenIngesters(ctx, set, config, f)
})
}
Expand All @@ -135,17 +211,16 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet
if err != nil {
return responseFromIngesters{addr: ingester.Addr}, err
}

resp, err := f(ctx, client.(logproto.QuerierClient))
if err != nil {
return responseFromIngesters{addr: ingester.Addr}, err
}

ExtractPartitionContext(ctx).AddClient(client.(logproto.QuerierClient), ingester.Addr)
return responseFromIngesters{ingester.Addr, resp}, nil
}, func(responseFromIngesters) {
// Nothing to do
}, func(cleanup responseFromIngesters) {
ExtractPartitionContext(ctx).RemoveClient(cleanup.addr)
})

if err != nil {
return nil, err
}
Expand All @@ -157,7 +232,7 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet
}

func (q *IngesterQuerier) SelectLogs(ctx context.Context, params logql.SelectLogParams) ([]iter.EntryIterator, error) {
resps, err := q.forAllIngesters(ctx, false, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
stats.FromContext(ctx).AddIngesterReached(1)
return client.Query(ctx, params.QueryRequest)
})
Expand All @@ -173,7 +248,7 @@ func (q *IngesterQuerier) SelectLogs(ctx context.Context, params logql.SelectLog
}

func (q *IngesterQuerier) SelectSample(ctx context.Context, params logql.SelectSampleParams) ([]iter.SampleIterator, error) {
resps, err := q.forAllIngesters(ctx, false, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
stats.FromContext(ctx).AddIngesterReached(1)
return client.QuerySample(ctx, params.SampleQueryRequest)
})
Expand All @@ -189,7 +264,7 @@ func (q *IngesterQuerier) SelectSample(ctx context.Context, params logql.SelectS
}

func (q *IngesterQuerier) Label(ctx context.Context, req *logproto.LabelRequest) ([][]string, error) {
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.Label(ctx, req)
})
if err != nil {
Expand All @@ -205,7 +280,7 @@ func (q *IngesterQuerier) Label(ctx context.Context, req *logproto.LabelRequest)
}

func (q *IngesterQuerier) Tail(ctx context.Context, req *logproto.TailRequest) (map[string]logproto.Querier_TailClient, error) {
resps, err := q.forAllIngesters(ctx, false, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.Tail(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -270,7 +345,7 @@ func (q *IngesterQuerier) TailDisconnectedIngesters(ctx context.Context, req *lo
}

func (q *IngesterQuerier) Series(ctx context.Context, req *logproto.SeriesRequest) ([][]logproto.SeriesIdentifier, error) {
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.Series(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -325,15 +400,22 @@ func (q *IngesterQuerier) TailersCount(ctx context.Context) ([]uint32, error) {
}

func (q *IngesterQuerier) GetChunkIDs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) {
// We must wait for all responses when using partition-ingesters to avoid a race between Query and GetChunkIDs calls.
// This occurs if call Query on an ingester after a recent flush then call GetChunkIDs on a different, unflushed ingester in the same partition.
resps, err := q.forAllIngesters(ctx, q.querierConfig.QueryPartitionIngesters, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
ingesterQueryFn := q.forAllIngesters

partitionCtx := ExtractPartitionContext(ctx)
if partitionCtx.IsPartitioned() {
// We need to query the same ingesters as the previous query
ingesterQueryFn = partitionCtx.forQueriedIngesters
}

resps, err := ingesterQueryFn(ctx, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
return querierClient.GetChunkIDs(ctx, &logproto.GetChunkIDsRequest{
Matchers: convertMatchersToString(matchers),
Start: from.Time(),
End: through.Time(),
})
})

if err != nil {
return nil, err
}
Expand All @@ -347,14 +429,13 @@ func (q *IngesterQuerier) GetChunkIDs(ctx context.Context, from, through model.T
}

func (q *IngesterQuerier) Stats(ctx context.Context, _ string, from, through model.Time, matchers ...*labels.Matcher) (*index_stats.Stats, error) {
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
return querierClient.GetStats(ctx, &logproto.IndexStatsRequest{
From: from,
Through: through,
Matchers: syntax.MatchersString(matchers),
})
})

if err != nil {
if isUnimplementedCallError(err) {
// Handle communication with older ingesters gracefully
Expand All @@ -378,7 +459,7 @@ func (q *IngesterQuerier) Volume(ctx context.Context, _ string, from, through mo
matcherString = syntax.MatchersString(matchers)
}

resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
return querierClient.GetVolume(ctx, &logproto.VolumeRequest{
From: from,
Through: through,
Expand All @@ -388,7 +469,6 @@ func (q *IngesterQuerier) Volume(ctx context.Context, _ string, from, through mo
AggregateBy: aggregateBy,
})
})

if err != nil {
if isUnimplementedCallError(err) {
// Handle communication with older ingesters gracefully
Expand All @@ -407,10 +487,9 @@ func (q *IngesterQuerier) Volume(ctx context.Context, _ string, from, through mo
}

func (q *IngesterQuerier) DetectedLabel(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.LabelToValuesResponse, error) {
ingesterResponses, err := q.forAllIngesters(ctx, false, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
ingesterResponses, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.GetDetectedLabels(ctx, req)
})

if err != nil {
level.Error(q.logger).Log("msg", "error getting detected labels", "err", err)
return nil, err
Expand Down
Loading

0 comments on commit 70d9587

Please sign in to comment.