Skip to content

Commit

Permalink
fix: Mitigate ingester race between Query & GetChunkIDs (#15178)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive authored Nov 28, 2024
1 parent f2c2a22 commit bd46e4c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 30 deletions.
55 changes: 31 additions & 24 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ import (
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

var defaultQuorumConfig = ring.DoUntilQuorumConfig{
// Nothing here
}

type responseFromIngesters struct {
addr string
response interface{}
Expand Down Expand Up @@ -79,7 +83,8 @@ func newIngesterQuerier(querierConfig Config, clientCfg client.Config, ring ring
}

// forAllIngesters runs f, in parallel, for all ingesters
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
// waitForAllResponses param can be used to require results from all ingesters in the replication set. If this is set to false, the call will return as soon as we have a quorum by zone. Only valid for partition-ingesters.
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, waitForAllResponses bool, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
if q.querierConfig.QueryPartitionIngesters {
tenantID, err := user.ExtractOrgID(ctx)
if err != nil {
Expand All @@ -94,36 +99,36 @@ func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Co
if err != nil {
return nil, err
}
return q.forGivenIngesterSets(ctx, replicationSets, f)
return q.forGivenIngesterSets(ctx, waitForAllResponses, replicationSets, f)
}

replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
if err != nil {
return nil, err
}

return q.forGivenIngesters(ctx, replicationSet, defaultQuorumConfig(), f)
return q.forGivenIngesters(ctx, replicationSet, defaultQuorumConfig, f)
}

// forGivenIngesterSets runs f, in parallel, for given ingester sets
func (q *IngesterQuerier) forGivenIngesterSets(ctx context.Context, replicationSet []ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
// Enable minimize requests so we initially query a single ingester per replication set, as each replication-set is one partition.
// waitForAllResponses param can be used to require results from all ingesters in all replication sets. If this is set to false, the call will return as soon as we have a quorum by zone.
func (q *IngesterQuerier) forGivenIngesterSets(ctx context.Context, waitForAllResponses bool, replicationSet []ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
// Enable minimize requests if we can, so we initially query a single ingester per replication set, as each replication-set is one partition.
// Ingesters must supply zone information for this to have an effect.
config := ring.DoUntilQuorumConfig{
MinimizeRequests: true,
MinimizeRequests: !waitForAllResponses,
}
return concurrency.ForEachJobMergeResults[ring.ReplicationSet, responseFromIngesters](ctx, replicationSet, 0, func(ctx context.Context, set ring.ReplicationSet) ([]responseFromIngesters, error) {
if waitForAllResponses {
// Tell the ring we need to return all responses from all zones
set.MaxErrors = 0
set.MaxUnavailableZones = 0
}
return q.forGivenIngesters(ctx, set, config, f)
})
}

func defaultQuorumConfig() ring.DoUntilQuorumConfig {
return ring.DoUntilQuorumConfig{
// Nothing here
}
}

// forGivenIngesters runs f, in parallel, for given ingesters
// forGivenIngesters runs f, in parallel, for given ingesters until a quorum of responses are received
func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ring.ReplicationSet, quorumConfig ring.DoUntilQuorumConfig, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
results, err := ring.DoUntilQuorum(ctx, replicationSet, quorumConfig, func(ctx context.Context, ingester *ring.InstanceDesc) (responseFromIngesters, error) {
client, err := q.pool.GetClientFor(ingester.Addr)
Expand Down Expand Up @@ -152,7 +157,7 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet
}

func (q *IngesterQuerier) SelectLogs(ctx context.Context, params logql.SelectLogParams) ([]iter.EntryIterator, error) {
resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, false, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
stats.FromContext(ctx).AddIngesterReached(1)
return client.Query(ctx, params.QueryRequest)
})
Expand All @@ -168,7 +173,7 @@ func (q *IngesterQuerier) SelectLogs(ctx context.Context, params logql.SelectLog
}

func (q *IngesterQuerier) SelectSample(ctx context.Context, params logql.SelectSampleParams) ([]iter.SampleIterator, error) {
resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, false, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
stats.FromContext(ctx).AddIngesterReached(1)
return client.QuerySample(ctx, params.SampleQueryRequest)
})
Expand All @@ -184,7 +189,7 @@ func (q *IngesterQuerier) SelectSample(ctx context.Context, params logql.SelectS
}

func (q *IngesterQuerier) Label(ctx context.Context, req *logproto.LabelRequest) ([][]string, error) {
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.Label(ctx, req)
})
if err != nil {
Expand All @@ -200,7 +205,7 @@ func (q *IngesterQuerier) Label(ctx context.Context, req *logproto.LabelRequest)
}

func (q *IngesterQuerier) Tail(ctx context.Context, req *logproto.TailRequest) (map[string]logproto.Querier_TailClient, error) {
resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, false, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.Tail(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -249,7 +254,7 @@ func (q *IngesterQuerier) TailDisconnectedIngesters(ctx context.Context, req *lo
}

// Instance a tail client for each ingester to re(connect)
reconnectClients, err := q.forGivenIngesters(ctx, ring.ReplicationSet{Instances: reconnectIngesters}, defaultQuorumConfig(), func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
reconnectClients, err := q.forGivenIngesters(ctx, ring.ReplicationSet{Instances: reconnectIngesters}, defaultQuorumConfig, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.Tail(ctx, req)
})
if err != nil {
Expand All @@ -265,7 +270,7 @@ func (q *IngesterQuerier) TailDisconnectedIngesters(ctx context.Context, req *lo
}

func (q *IngesterQuerier) Series(ctx context.Context, req *logproto.SeriesRequest) ([][]logproto.SeriesIdentifier, error) {
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.Series(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -297,7 +302,7 @@ func (q *IngesterQuerier) TailersCount(ctx context.Context) ([]uint32, error) {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found")
}

responses, err := q.forGivenIngesters(ctx, replicationSet, defaultQuorumConfig(), func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
responses, err := q.forGivenIngesters(ctx, replicationSet, defaultQuorumConfig, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
resp, err := querierClient.TailersCount(ctx, &logproto.TailersCountRequest{})
if err != nil {
return nil, err
Expand All @@ -320,7 +325,9 @@ func (q *IngesterQuerier) TailersCount(ctx context.Context) ([]uint32, error) {
}

func (q *IngesterQuerier) GetChunkIDs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) {
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
// We must wait for all responses when using partition-ingesters to avoid a race between Query and GetChunkIDs calls.
// This occurs if call Query on an ingester after a recent flush then call GetChunkIDs on a different, unflushed ingester in the same partition.
resps, err := q.forAllIngesters(ctx, q.querierConfig.QueryPartitionIngesters, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
return querierClient.GetChunkIDs(ctx, &logproto.GetChunkIDsRequest{
Matchers: convertMatchersToString(matchers),
Start: from.Time(),
Expand All @@ -340,7 +347,7 @@ func (q *IngesterQuerier) GetChunkIDs(ctx context.Context, from, through model.T
}

func (q *IngesterQuerier) Stats(ctx context.Context, _ string, from, through model.Time, matchers ...*labels.Matcher) (*index_stats.Stats, error) {
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
return querierClient.GetStats(ctx, &logproto.IndexStatsRequest{
From: from,
Through: through,
Expand Down Expand Up @@ -371,7 +378,7 @@ func (q *IngesterQuerier) Volume(ctx context.Context, _ string, from, through mo
matcherString = syntax.MatchersString(matchers)
}

resps, err := q.forAllIngesters(ctx, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
return querierClient.GetVolume(ctx, &logproto.VolumeRequest{
From: from,
Through: through,
Expand Down Expand Up @@ -400,7 +407,7 @@ func (q *IngesterQuerier) Volume(ctx context.Context, _ string, from, through mo
}

func (q *IngesterQuerier) DetectedLabel(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.LabelToValuesResponse, error) {
ingesterResponses, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
ingesterResponses, err := q.forAllIngesters(ctx, false, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.GetDetectedLabels(ctx, req)
})

Expand Down
17 changes: 11 additions & 6 deletions pkg/querier/ingester_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,11 @@ func TestIngesterQuerierFetchesResponsesFromPartitionIngesters(t *testing.T) {
}

tests := map[string]struct {
method string
testFn func(*IngesterQuerier) error
retVal interface{}
shards int
method string
testFn func(*IngesterQuerier) error
retVal interface{}
shards int
expectAllResponses bool
}{
"label": {
method: "Label",
Expand All @@ -268,7 +269,8 @@ func TestIngesterQuerierFetchesResponsesFromPartitionIngesters(t *testing.T) {
_, err := ingesterQuerier.GetChunkIDs(ctx, model.Time(0), model.Time(0))
return err
},
retVal: new(logproto.GetChunkIDsResponse),
retVal: new(logproto.GetChunkIDsResponse),
expectAllResponses: true,
},
"select_logs": {
method: "Query",
Expand Down Expand Up @@ -314,7 +316,7 @@ func TestIngesterQuerierFetchesResponsesFromPartitionIngesters(t *testing.T) {
select {
case <-ctx.Done():
// should not be cancelled by the tracker
require.NoError(t, ctx.Err())
require.NoErrorf(t, ctx.Err(), "tracker should not cancel ctx: %v", context.Cause(ctx))
default:
cnt.Add(1)
}
Expand All @@ -340,6 +342,9 @@ func TestIngesterQuerierFetchesResponsesFromPartitionIngesters(t *testing.T) {
testData.shards = partitions
}
expectedCalls := min(testData.shards, partitions)
if testData.expectAllResponses {
expectedCalls = expectedCalls * ingestersPerPartition
}
// Wait for responses: We expect one request per queried partition because we have request minimization enabled & ingesters are in multiple zones.
// If shuffle sharding is enabled, we expect one query per shard as we write to a subset of partitions.
require.Eventually(t, func() bool { return cnt.Load() >= int32(expectedCalls) }, time.Millisecond*100, time.Millisecond*1, "expected all ingesters to respond")
Expand Down

0 comments on commit bd46e4c

Please sign in to comment.