Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(backport): Add querier fix into k231 #15252

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 108 additions & 29 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"slices"
"strings"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -82,10 +83,91 @@ func newIngesterQuerier(querierConfig Config, clientCfg client.Config, ring ring
return &iq, nil
}

type ctxKeyType string

const (
partitionCtxKey ctxKeyType = "partitionCtx"
)

type PartitionContext struct {
isPartitioned bool
ingestersUsed map[string]PartitionIngesterUsed
mtx sync.Mutex
}

type PartitionIngesterUsed struct {
client logproto.QuerierClient
addr string
}

func (p *PartitionContext) AddClient(client logproto.QuerierClient, addr string) {
p.mtx.Lock()
defer p.mtx.Unlock()
if !p.isPartitioned {
return
}
p.ingestersUsed[addr] = PartitionIngesterUsed{client: client, addr: addr}
}

func (p *PartitionContext) RemoveClient(addr string) {
p.mtx.Lock()
defer p.mtx.Unlock()
if !p.isPartitioned {
return
}
delete(p.ingestersUsed, addr)
}

func (p *PartitionContext) SetIsPartitioned(isPartitioned bool) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.isPartitioned = isPartitioned
}

func (p *PartitionContext) IsPartitioned() bool {
return p.isPartitioned
}

func (p *PartitionContext) forQueriedIngesters(ctx context.Context, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
p.mtx.Lock()
defer p.mtx.Unlock()

ingestersUsed := make([]PartitionIngesterUsed, 0, len(p.ingestersUsed))
for _, ingester := range p.ingestersUsed {
ingestersUsed = append(ingestersUsed, ingester)
}

return concurrency.ForEachJobMergeResults(ctx, ingestersUsed, 0, func(ctx context.Context, job PartitionIngesterUsed) ([]responseFromIngesters, error) {
resp, err := f(ctx, job.client)
if err != nil {
return nil, err
}
return []responseFromIngesters{{addr: job.addr, response: resp}}, nil
})
}

// NewPartitionContext creates a new partition context
// This is used to track which ingesters were used in the query and reuse the same ingesters for consecutive queries
func NewPartitionContext(ctx context.Context) context.Context {
return context.WithValue(ctx, partitionCtxKey, &PartitionContext{
ingestersUsed: make(map[string]PartitionIngesterUsed),
})
}

func ExtractPartitionContext(ctx context.Context) *PartitionContext {
v, ok := ctx.Value(partitionCtxKey).(*PartitionContext)
if !ok {
return &PartitionContext{
ingestersUsed: make(map[string]PartitionIngesterUsed),
}
}
return v
}

// forAllIngesters runs f, in parallel, for all ingesters
// waitForAllResponses param can be used to require results from all ingesters in the replication set. If this is set to false, the call will return as soon as we have a quorum by zone. Only valid for partition-ingesters.
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, waitForAllResponses bool, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
if q.querierConfig.QueryPartitionIngesters {
ExtractPartitionContext(ctx).SetIsPartitioned(true)
tenantID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
Expand All @@ -99,7 +181,7 @@ func (q *IngesterQuerier) forAllIngesters(ctx context.Context, waitForAllRespons
if err != nil {
return nil, err
}
return q.forGivenIngesterSets(ctx, waitForAllResponses, replicationSets, f)
return q.forGivenIngesterSets(ctx, replicationSets, f)
}

replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
Expand All @@ -111,19 +193,13 @@ func (q *IngesterQuerier) forAllIngesters(ctx context.Context, waitForAllRespons
}

// forGivenIngesterSets runs f, in parallel, for given ingester sets
// waitForAllResponses param can be used to require results from all ingesters in all replication sets. If this is set to false, the call will return as soon as we have a quorum by zone.
func (q *IngesterQuerier) forGivenIngesterSets(ctx context.Context, waitForAllResponses bool, replicationSet []ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
func (q *IngesterQuerier) forGivenIngesterSets(ctx context.Context, replicationSet []ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
// Enable minimize requests if we can, so we initially query a single ingester per replication set, as each replication-set is one partition.
// Ingesters must supply zone information for this to have an effect.
config := ring.DoUntilQuorumConfig{
MinimizeRequests: !waitForAllResponses,
MinimizeRequests: true,
}
return concurrency.ForEachJobMergeResults[ring.ReplicationSet, responseFromIngesters](ctx, replicationSet, 0, func(ctx context.Context, set ring.ReplicationSet) ([]responseFromIngesters, error) {
if waitForAllResponses {
// Tell the ring we need to return all responses from all zones
set.MaxErrors = 0
set.MaxUnavailableZones = 0
}
return q.forGivenIngesters(ctx, set, config, f)
})
}
Expand All @@ -135,17 +211,16 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet
if err != nil {
return responseFromIngesters{addr: ingester.Addr}, err
}

resp, err := f(ctx, client.(logproto.QuerierClient))
if err != nil {
return responseFromIngesters{addr: ingester.Addr}, err
}

ExtractPartitionContext(ctx).AddClient(client.(logproto.QuerierClient), ingester.Addr)
return responseFromIngesters{ingester.Addr, resp}, nil
}, func(responseFromIngesters) {
// Nothing to do
}, func(cleanup responseFromIngesters) {
ExtractPartitionContext(ctx).RemoveClient(cleanup.addr)
})

if err != nil {
return nil, err
}
Expand All @@ -157,7 +232,7 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet
}

func (q *IngesterQuerier) SelectLogs(ctx context.Context, params logql.SelectLogParams) ([]iter.EntryIterator, error) {
resps, err := q.forAllIngesters(ctx, false, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
stats.FromContext(ctx).AddIngesterReached(1)
return client.Query(ctx, params.QueryRequest)
})
Expand All @@ -173,7 +248,7 @@ func (q *IngesterQuerier) SelectLogs(ctx context.Context, params logql.SelectLog
}

func (q *IngesterQuerier) SelectSample(ctx context.Context, params logql.SelectSampleParams) ([]iter.SampleIterator, error) {
resps, err := q.forAllIngesters(ctx, false, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
stats.FromContext(ctx).AddIngesterReached(1)
return client.QuerySample(ctx, params.SampleQueryRequest)
})
Expand All @@ -189,7 +264,7 @@ func (q *IngesterQuerier) SelectSample(ctx context.Context, params logql.SelectS
}

func (q *IngesterQuerier) Label(ctx context.Context, req *logproto.LabelRequest) ([][]string, error) {
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.Label(ctx, req)
})
if err != nil {
Expand All @@ -205,7 +280,7 @@ func (q *IngesterQuerier) Label(ctx context.Context, req *logproto.LabelRequest)
}

func (q *IngesterQuerier) Tail(ctx context.Context, req *logproto.TailRequest) (map[string]logproto.Querier_TailClient, error) {
resps, err := q.forAllIngesters(ctx, false, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.Tail(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -270,7 +345,7 @@ func (q *IngesterQuerier) TailDisconnectedIngesters(ctx context.Context, req *lo
}

func (q *IngesterQuerier) Series(ctx context.Context, req *logproto.SeriesRequest) ([][]logproto.SeriesIdentifier, error) {
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.Series(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -325,15 +400,22 @@ func (q *IngesterQuerier) TailersCount(ctx context.Context) ([]uint32, error) {
}

func (q *IngesterQuerier) GetChunkIDs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) {
// We must wait for all responses when using partition-ingesters to avoid a race between Query and GetChunkIDs calls.
// This occurs if call Query on an ingester after a recent flush then call GetChunkIDs on a different, unflushed ingester in the same partition.
resps, err := q.forAllIngesters(ctx, q.querierConfig.QueryPartitionIngesters, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
ingesterQueryFn := q.forAllIngesters

partitionCtx := ExtractPartitionContext(ctx)
if partitionCtx.IsPartitioned() {
// We need to query the same ingesters as the previous query
ingesterQueryFn = partitionCtx.forQueriedIngesters
}

resps, err := ingesterQueryFn(ctx, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
return querierClient.GetChunkIDs(ctx, &logproto.GetChunkIDsRequest{
Matchers: convertMatchersToString(matchers),
Start: from.Time(),
End: through.Time(),
})
})

if err != nil {
return nil, err
}
Expand All @@ -347,14 +429,13 @@ func (q *IngesterQuerier) GetChunkIDs(ctx context.Context, from, through model.T
}

func (q *IngesterQuerier) Stats(ctx context.Context, _ string, from, through model.Time, matchers ...*labels.Matcher) (*index_stats.Stats, error) {
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
return querierClient.GetStats(ctx, &logproto.IndexStatsRequest{
From: from,
Through: through,
Matchers: syntax.MatchersString(matchers),
})
})

if err != nil {
if isUnimplementedCallError(err) {
// Handle communication with older ingesters gracefully
Expand All @@ -378,7 +459,7 @@ func (q *IngesterQuerier) Volume(ctx context.Context, _ string, from, through mo
matcherString = syntax.MatchersString(matchers)
}

resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
return querierClient.GetVolume(ctx, &logproto.VolumeRequest{
From: from,
Through: through,
Expand All @@ -388,7 +469,6 @@ func (q *IngesterQuerier) Volume(ctx context.Context, _ string, from, through mo
AggregateBy: aggregateBy,
})
})

if err != nil {
if isUnimplementedCallError(err) {
// Handle communication with older ingesters gracefully
Expand All @@ -407,10 +487,9 @@ func (q *IngesterQuerier) Volume(ctx context.Context, _ string, from, through mo
}

func (q *IngesterQuerier) DetectedLabel(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.LabelToValuesResponse, error) {
ingesterResponses, err := q.forAllIngesters(ctx, false, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
ingesterResponses, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.GetDetectedLabels(ctx, req)
})

if err != nil {
level.Error(q.logger).Log("msg", "error getting detected labels", "err", err)
return nil, err
Expand Down
Loading
Loading