diff --git a/superset/commands/report/alert.py b/superset/commands/report/alert.py index ea45853b29a5f..d713c45811021 100644 --- a/superset/commands/report/alert.py +++ b/superset/commands/report/alert.py @@ -20,6 +20,7 @@ from operator import eq, ge, gt, le, lt, ne from timeit import default_timer from typing import Any +from uuid import UUID import numpy as np import pandas as pd @@ -40,6 +41,7 @@ from superset.tasks.utils import get_executor from superset.utils import json from superset.utils.core import override_user +from superset.utils.decorators import logs_context from superset.utils.retries import retry_call logger = logging.getLogger(__name__) @@ -52,8 +54,9 @@ class AlertCommand(BaseCommand): - def __init__(self, report_schedule: ReportSchedule): + def __init__(self, report_schedule: ReportSchedule, execution_id: UUID): self._report_schedule = report_schedule + self._execution_id = execution_id self._result: float | None = None def run(self) -> bool: @@ -135,6 +138,13 @@ def _is_validator_operator(self) -> bool: self._report_schedule.validator_type == ReportScheduleValidatorType.OPERATOR ) + def _get_alert_metadata_from_object(self) -> dict[str, Any]: + return { + "report_schedule_id": self._report_schedule.id, + "execution_id": self._execution_id, + } + + @logs_context(context_func=_get_alert_metadata_from_object) def _execute_query(self) -> pd.DataFrame: """ Executes the actual alert SQL query template @@ -152,6 +162,13 @@ def _execute_query(self) -> pd.DataFrame: rendered_sql, ALERT_SQL_LIMIT ) + if app.config["MUTATE_ALERT_QUERY"]: + limited_rendered_sql = ( + self._report_schedule.database.mutate_sql_based_on_config( + limited_rendered_sql + ) + ) + executor, username = get_executor( # pylint: disable=unused-variable executor_types=app.config["ALERT_REPORTS_EXECUTE_AS"], model=self._report_schedule, diff --git a/superset/commands/report/execute.py b/superset/commands/report/execute.py index 9293e967aa504..54a2890a96f91 100644 --- a/superset/commands/report/execute.py +++ b/superset/commands/report/execute.py @@ -698,7 +698,7 @@ def next(self) -> None: try: # If it's an alert check if the alert is triggered if self._report_schedule.type == ReportScheduleType.ALERT: - if not AlertCommand(self._report_schedule).run(): + if not AlertCommand(self._report_schedule, self._execution_id).run(): self.update_report_schedule_and_log(ReportState.NOOP) return self.send() @@ -782,7 +782,7 @@ def next(self) -> None: return self.update_report_schedule_and_log(ReportState.WORKING) try: - if not AlertCommand(self._report_schedule).run(): + if not AlertCommand(self._report_schedule, self._execution_id).run(): self.update_report_schedule_and_log(ReportState.NOOP) return except Exception as ex: diff --git a/superset/config.py b/superset/config.py index a77ab15f2048e..fa55318f69538 100644 --- a/superset/config.py +++ b/superset/config.py @@ -1380,6 +1380,10 @@ def SQL_QUERY_MUTATOR( # pylint: disable=invalid-name,unused-argument # noqa: MUTATE_AFTER_SPLIT = False +# Boolean config that determines if alert SQL queries should also be mutated or not. +MUTATE_ALERT_QUERY = False + + # This allows for a user to add header data to any outgoing emails. For example, # if you need to include metadata in the header or you want to change the specifications # of the email title, header, or sender. diff --git a/tests/integration_tests/reports/alert_tests.py b/tests/integration_tests/reports/alert_tests.py index b4dfdabd1a4ca..16ce8f3fed34b 100644 --- a/tests/integration_tests/reports/alert_tests.py +++ b/tests/integration_tests/reports/alert_tests.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=invalid-name, unused-argument, import-outside-toplevel +import uuid from contextlib import nullcontext, suppress from typing import Optional, Union @@ -84,7 +85,7 @@ def test_execute_query_as_report_executor( database=get_example_database(), validator_config_json='{"op": "==", "threshold": 1}', ) - command = AlertCommand(report_schedule=report_schedule) + command = AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()) override_user_mock = mocker.patch("superset.commands.report.alert.override_user") cm = ( pytest.raises(type(expected_result)) @@ -98,6 +99,83 @@ def test_execute_query_as_report_executor( app.config["ALERT_REPORTS_EXECUTE_AS"] = original_config +def test_execute_query_mutate_query_enabled( + mocker: MockerFixture, + app_context: AppContext, + get_user, +) -> None: + from superset.commands.report.alert import AlertCommand + from superset.reports.models import ReportSchedule + + default_alert_mutate_ff = app.config["MUTATE_ALERT_QUERY"] + + app.config["MUTATE_ALERT_QUERY"] = True + mocker.patch("superset.commands.report.alert.override_user") + mock_df = mocker.MagicMock(spec=pd.DataFrame) + mock_df.empty = True + mock_database = get_example_database() + mock_get_df = mocker.patch.object(mock_database, "get_df", return_value=mock_df) + mock_limited_sql = mocker.patch.object(mock_database, "apply_limit_to_sql") + mock_mutate_call = mocker.patch.object(mock_database, "mutate_sql_based_on_config") + + report_schedule = ReportSchedule( + created_by=get_user("admin"), + owners=[get_user("admin")], + type=ReportScheduleType.ALERT, + description="description", + crontab="0 9 * * *", + creation_method=ReportCreationMethod.ALERTS_REPORTS, + sql="SELECT 1", + grace_period=14400, + working_timeout=3600, + database=mock_database, + validator_config_json='{"op": "==", "threshold": 1}', + ) + AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()).run() + + mock_mutate_call.assert_called_once_with(mock_limited_sql.return_value) + mock_get_df.assert_called_once_with(sql=mock_mutate_call.return_value) + + app.config["MUTATE_ALERT_QUERY"] = default_alert_mutate_ff + + +def test_execute_query_mutate_query_disabled( + mocker: MockerFixture, + app_context: AppContext, + get_user, +) -> None: + from superset.commands.report.alert import AlertCommand + from superset.reports.models import ReportSchedule + + default_alert_mutate_ff = app.config["MUTATE_ALERT_QUERY"] + + app.config["MUTATE_ALERT_QUERY"] = False + mocker.patch("superset.commands.report.alert.override_user") + mock_database = mocker.MagicMock() + + report_schedule = ReportSchedule( + created_by=get_user("admin"), + owners=[get_user("admin")], + type=ReportScheduleType.ALERT, + description="description", + crontab="0 9 * * *", + creation_method=ReportCreationMethod.ALERTS_REPORTS, + sql="SELECT 1", + grace_period=14400, + working_timeout=3600, + database=mock_database, + validator_config_json='{"op": "==", "threshold": 1}', + ) + AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()).run() + + mock_database.mutate_sql_based_on_config.assert_not_called() + mock_database.get_df.assert_called_once_with( + sql=mock_database.apply_limit_to_sql.return_value + ) + + app.config["MUTATE_ALERT_QUERY"] = default_alert_mutate_ff + + def test_execute_query_succeeded_no_retry( mocker: MockerFixture, app_context: None ) -> None: @@ -108,7 +186,7 @@ def test_execute_query_succeeded_no_retry( side_effect=lambda: pd.DataFrame([{"sample_col": 0}]), ) - command = AlertCommand(report_schedule=mocker.Mock()) + command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4()) command.validate() @@ -140,7 +218,7 @@ def _mocked_execute_query() -> pd.DataFrame: execute_query_mock.side_effect = _mocked_execute_query execute_query_mock.__name__ = "mocked_execute_query" - command = AlertCommand(report_schedule=mocker.Mock()) + command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4()) command.validate() @@ -162,7 +240,7 @@ def _mocked_execute_query() -> None: execute_query_mock.side_effect = _mocked_execute_query execute_query_mock.__name__ = "mocked_execute_query" - command = AlertCommand(report_schedule=mocker.Mock()) + command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4()) with suppress(AlertQueryTimeout): command.validate() @@ -184,9 +262,42 @@ def _mocked_execute_query() -> None: execute_query_mock.side_effect = _mocked_execute_query execute_query_mock.__name__ = "mocked_execute_query" - command = AlertCommand(report_schedule=mocker.Mock()) + command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4()) with suppress(AlertQueryError): command.validate() # Should match the value defined in superset_test_config.py assert execute_query_mock.call_count == 3 + + +def test_get_alert_metadata_from_object( + mocker: MockerFixture, + app_context: AppContext, + get_user, +) -> None: + from superset.commands.report.alert import AlertCommand + from superset.reports.models import ReportSchedule + + app.config["ALERT_REPORTS_EXECUTE_AS"] = [ExecutorType.OWNER] + + mock_database = mocker.MagicMock() + mock_exec_id = uuid.uuid4() + report_schedule = ReportSchedule( + created_by=get_user("admin"), + owners=[get_user("admin")], + type=ReportScheduleType.ALERT, + description="description", + crontab="0 9 * * *", + creation_method=ReportCreationMethod.ALERTS_REPORTS, + sql="SELECT 1", + grace_period=14400, + working_timeout=3600, + database=mock_database, + validator_config_json='{"op": "==", "threshold": 1}', + ) + + cm = AlertCommand(report_schedule=report_schedule, execution_id=mock_exec_id) + assert cm._get_alert_metadata_from_object() == { + "report_schedule_id": report_schedule.id, + "execution_id": mock_exec_id, + }