Skip to content

Commit

Permalink
fix: Shut down delete client on local rule-evaluator (grafana#15345)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Dec 11, 2024
1 parent 2e07c40 commit 42469cc
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,16 +1339,29 @@ func (t *Loki) initRuleEvaluator() (services.Service, error) {
mode := t.Cfg.Ruler.Evaluation.Mode
logger := log.With(util_log.Logger, "component", "ruler", "evaluation_mode", mode)

var svc services.Service
switch mode {
case ruler.EvalModeLocal:
var engine *logql.Engine
var deleteStore deletion.DeleteRequestsClient
deleteStore, err = t.deleteRequestsClient("rule-evaluator", t.Overrides)
if err != nil {
break
}

engine, err = t.createRulerQueryEngine(logger)
var engine *logql.Engine
engine, err = t.createRulerQueryEngine(logger, deleteStore)
if err != nil {
break
}

evaluator, err = ruler.NewLocalEvaluator(engine, logger)

// The delete client needs to be stopped when the evaluator is stopped.
// We wrap the client on a IDLE service and call Stop on shutdown.
svc = services.NewIdleService(nil, func(_ error) error {
deleteStore.Stop()
return nil
})
case ruler.EvalModeRemote:
qfClient, e := ruler.DialQueryFrontend(&t.Cfg.Ruler.Evaluation.QueryFrontend)
if e != nil {
Expand All @@ -1366,7 +1379,7 @@ func (t *Loki) initRuleEvaluator() (services.Service, error) {

t.ruleEvaluator = ruler.NewEvaluatorWithJitter(evaluator, t.Cfg.Ruler.Evaluation.MaxJitter, fnv.New32a(), logger)

return nil, nil
return svc, nil
}

func (t *Loki) initMemberlistKV() (services.Service, error) {
Expand Down Expand Up @@ -1912,12 +1925,7 @@ func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLi
return deletion.NewPerTenantDeleteRequestsClient(client, limits), nil
}

func (t *Loki) createRulerQueryEngine(logger log.Logger) (eng *logql.Engine, err error) {
deleteStore, err := t.deleteRequestsClient("rule-evaluator", t.Overrides)
if err != nil {
return nil, fmt.Errorf("could not create delete requests store: %w", err)
}

func (t *Loki) createRulerQueryEngine(logger log.Logger, deleteStore deletion.DeleteRequestsClient) (eng *logql.Engine, err error) {
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, nil, logger)
if err != nil {
return nil, fmt.Errorf("could not create querier: %w", err)
Expand Down

0 comments on commit 42469cc

Please sign in to comment.