diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index e30136c2b8637..31393d874a2f4 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1339,16 +1339,29 @@ func (t *Loki) initRuleEvaluator() (services.Service, error) { mode := t.Cfg.Ruler.Evaluation.Mode logger := log.With(util_log.Logger, "component", "ruler", "evaluation_mode", mode) + var svc services.Service switch mode { case ruler.EvalModeLocal: - var engine *logql.Engine + var deleteStore deletion.DeleteRequestsClient + deleteStore, err = t.deleteRequestsClient("rule-evaluator", t.Overrides) + if err != nil { + break + } - engine, err = t.createRulerQueryEngine(logger) + var engine *logql.Engine + engine, err = t.createRulerQueryEngine(logger, deleteStore) if err != nil { break } evaluator, err = ruler.NewLocalEvaluator(engine, logger) + + // The delete client needs to be stopped when the evaluator is stopped. + // We wrap the client on a IDLE service and call Stop on shutdown. + svc = services.NewIdleService(nil, func(_ error) error { + deleteStore.Stop() + return nil + }) case ruler.EvalModeRemote: qfClient, e := ruler.DialQueryFrontend(&t.Cfg.Ruler.Evaluation.QueryFrontend) if e != nil { @@ -1366,7 +1379,7 @@ func (t *Loki) initRuleEvaluator() (services.Service, error) { t.ruleEvaluator = ruler.NewEvaluatorWithJitter(evaluator, t.Cfg.Ruler.Evaluation.MaxJitter, fnv.New32a(), logger) - return nil, nil + return svc, nil } func (t *Loki) initMemberlistKV() (services.Service, error) { @@ -1912,12 +1925,7 @@ func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLi return deletion.NewPerTenantDeleteRequestsClient(client, limits), nil } -func (t *Loki) createRulerQueryEngine(logger log.Logger) (eng *logql.Engine, err error) { - deleteStore, err := t.deleteRequestsClient("rule-evaluator", t.Overrides) - if err != nil { - return nil, fmt.Errorf("could not create delete requests store: %w", err) - } - +func (t *Loki) createRulerQueryEngine(logger log.Logger, deleteStore deletion.DeleteRequestsClient) (eng *logql.Engine, err error) { q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, nil, logger) if err != nil { return nil, fmt.Errorf("could not create querier: %w", err)