diff --git a/clients/pkg/promtail/discovery/consulagent/consul.go b/clients/pkg/promtail/discovery/consulagent/consul.go index 2a08498efea69..89e69dfe59eb6 100644 --- a/clients/pkg/promtail/discovery/consulagent/consul.go +++ b/clients/pkg/promtail/discovery/consulagent/consul.go @@ -316,7 +316,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { ticker := time.NewTicker(d.refreshInterval) // Watched services and their cancellation functions. - services := make(map[string]func()) + services := make(map[string]func(error)) for { select { @@ -340,7 +340,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) { // Watch the catalog for new services we would like to watch. This is called only // when we don't know yet the names of the services and need to ask Consul the // entire list of services. -func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.Group, services map[string]func()) { +func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.Group, services map[string]func(error)) { agent := d.client.Agent() level.Debug(d.logger).Log("msg", "Watching services", "tags", strings.Join(d.watchedTags, ",")) @@ -378,7 +378,7 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup. continue // We are already watching the service. } - wctx, cancel := context.WithCancel(ctx) + wctx, cancel := context.WithCancelCause(ctx) d.watchService(wctx, ch, name) services[name] = cancel } @@ -390,7 +390,7 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup. "msg", "removing service since consul no longer has a record of it", "name", name) // Call the watch cancellation function. - cancel() + cancel(errors.New("canceling service since consul no longer has a record of it")) delete(services, name) // Send clearing target group. diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index b44b134c8bdde..cdf05829c200e 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -823,7 +823,7 @@ func newBinOpStepEvaluator( var lse, rse StepEvaluator - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancelCause(ctx) g := errgroup.Group{} // We have two non-literal legs, @@ -832,7 +832,7 @@ func newBinOpStepEvaluator( var err error lse, err = evFactory.NewStepEvaluator(ctx, evFactory, expr.SampleExpr, q) if err != nil { - cancel() + cancel(fmt.Errorf("new step evaluator for left leg errored: %w", err)) } return err }) @@ -840,7 +840,7 @@ func newBinOpStepEvaluator( var err error rse, err = evFactory.NewStepEvaluator(ctx, evFactory, expr.RHS, q) if err != nil { - cancel() + cancel(fmt.Errorf("new step evaluator for right leg errored: %w", err)) } return err }) diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 302c1c42814b3..6996c29c65a6b 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/middleware" "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/promql/parser" @@ -474,7 +475,7 @@ func WrapQuerySpanAndTimeout(call string, limits Limits) middleware.Interface { timeoutCapture := func(id string) time.Duration { return limits.QueryTimeout(ctx, id) } timeout := util_validation.SmallestPositiveNonZeroDurationPerTenant(tenants, timeoutCapture) - newCtx, cancel := context.WithTimeout(ctx, timeout) + newCtx, cancel := context.WithTimeoutCause(ctx, timeout, errors.New("query timeout reached")) defer cancel() newReq := req.WithContext(newCtx) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 0cd101cd6eebf..9f31cb9b82a34 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -549,7 +549,7 @@ func (q *SingleTenantQuerier) Series(ctx context.Context, req *logproto.SeriesRe // Enforce the query timeout while querying backends queryTimeout := q.limits.QueryTimeout(ctx, userID) - ctx, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout)) + ctx, cancel := context.WithDeadlineCause(ctx, time.Now().Add(queryTimeout), errors.New("query timeout reached")) defer cancel() return q.awaitSeries(ctx, req) diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index 7dfeb729e149a..040befd26de93 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/dskit/httpgrpc" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" @@ -102,8 +103,8 @@ func (h *splitByInterval) Process( maxSeries int, ) ([]queryrangebase.Response, error) { var responses []queryrangebase.Response - ctx, cancel := context.WithCancel(ctx) - defer cancel() + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(errors.New("split by interval process canceled")) ch := h.Feed(ctx, input) diff --git a/pkg/querier/worker/frontend_processor.go b/pkg/querier/worker/frontend_processor.go index 1327a30ae3190..5f1b57a62df02 100644 --- a/pkg/querier/worker/frontend_processor.go +++ b/pkg/querier/worker/frontend_processor.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/httpgrpc" "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" "google.golang.org/grpc" "github.com/grafana/loki/v3/pkg/lokifrontend/frontend/v1/frontendv1pb" @@ -83,8 +84,8 @@ func (fp *frontendProcessor) processQueriesOnSingleStream(ctx context.Context, c // process loops processing requests on an established stream. func (fp *frontendProcessor) process(c frontendv1pb.Frontend_ProcessClient) error { // Build a child context so we can cancel a query when the stream is closed. - ctx, cancel := context.WithCancel(c.Context()) - defer cancel() + ctx, cancel := context.WithCancelCause(c.Context()) + defer cancel(errors.New("frontend processor process finished")) for { request, err := c.Recv() diff --git a/pkg/querier/worker/processor_manager.go b/pkg/querier/worker/processor_manager.go index 74c9517f86c24..4179cacfc8a09 100644 --- a/pkg/querier/worker/processor_manager.go +++ b/pkg/querier/worker/processor_manager.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/pkg/errors" "go.uber.org/atomic" "google.golang.org/grpc" ) @@ -44,7 +45,7 @@ func newProcessorManager(ctx context.Context, p processor, conn *grpc.ClientConn func (pm *processorManager) stop() { // Notify the remote query-frontend or query-scheduler we're shutting down. // We use a new context to make sure it's not cancelled. - notifyCtx, cancel := context.WithTimeout(context.Background(), notifyShutdownTimeout) + notifyCtx, cancel := context.WithTimeoutCause(context.Background(), notifyShutdownTimeout, errors.New("notify shutdown timeout reached")) defer cancel() pm.p.notifyShutdown(notifyCtx, pm.conn, pm.address) diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index 97f6d8f4d1df9..56a235d813399 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -21,6 +21,7 @@ import ( "github.com/grafana/dskit/user" otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" @@ -89,7 +90,7 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Con // Run the querier loop (and so all the queries) in a dedicated context that we call the "execution context". // The execution context is cancelled once the workerCtx is cancelled AND there's no inflight query executing. execCtx, execCancel, inflightQuery := newExecutionContext(workerCtx, sp.log) - defer execCancel() + defer execCancel(errors.New("scheduler processor execution context canceled")) backoff := backoff.New(execCtx, processorBackoffConfig) for backoff.Ongoing() { @@ -121,8 +122,8 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Con // process loops processing requests on an established stream. func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string, inflightQuery *atomic.Bool, workerID string) error { // Build a child context so we can cancel a query when the stream is closed. - ctx, cancel := context.WithCancel(c.Context()) - defer cancel() + ctx, cancel := context.WithCancelCause(c.Context()) + defer cancel(errors.New("querier loop canceled")) for { start := time.Now() diff --git a/pkg/querier/worker/util.go b/pkg/querier/worker/util.go index 3ebb0029bfe76..8f9659e50f5ca 100644 --- a/pkg/querier/worker/util.go +++ b/pkg/querier/worker/util.go @@ -11,6 +11,7 @@ import ( "github.com/go-kit/log/level" "github.com/gogo/status" "github.com/grafana/dskit/httpgrpc" + "github.com/pkg/errors" "go.uber.org/atomic" "google.golang.org/grpc/codes" @@ -32,8 +33,8 @@ import ( // - The execution context is canceled when the worker context gets cancelled (ie. querier is shutting down) // and there's no inflight query execution. In case there's an inflight query, the execution context is canceled // once the inflight query terminates and the response has been sent. -func newExecutionContext(workerCtx context.Context, logger log.Logger) (execCtx context.Context, execCancel context.CancelFunc, inflightQuery *atomic.Bool) { - execCtx, execCancel = context.WithCancel(context.Background()) +func newExecutionContext(workerCtx context.Context, logger log.Logger) (execCtx context.Context, execCancel context.CancelCauseFunc, inflightQuery *atomic.Bool) { + execCtx, execCancel = context.WithCancelCause(context.Background()) inflightQuery = atomic.NewBool(false) go func() { @@ -76,7 +77,7 @@ func newExecutionContext(workerCtx context.Context, logger log.Logger) (execCtx } level.Debug(logger).Log("msg", "querier worker context has been canceled and there's no inflight query, canceling the execution context too") - execCancel() + execCancel(errors.New("querier worker context has been canceled and there's no inflight query")) case <-execCtx.Done(): // Nothing to do. The execution context has been explicitly canceled. }