Skip to content

Commit

Permalink
feat: Mutate SQL query executed by alerts (apache#31840)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vitor-Avila authored Jan 15, 2025
1 parent d4bd20f commit 754ccd0
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 8 deletions.
19 changes: 18 additions & 1 deletion superset/commands/report/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from operator import eq, ge, gt, le, lt, ne
from timeit import default_timer
from typing import Any
from uuid import UUID

import numpy as np
import pandas as pd
Expand All @@ -40,6 +41,7 @@
from superset.tasks.utils import get_executor
from superset.utils import json
from superset.utils.core import override_user
from superset.utils.decorators import logs_context
from superset.utils.retries import retry_call

logger = logging.getLogger(__name__)
Expand All @@ -52,8 +54,9 @@


class AlertCommand(BaseCommand):
def __init__(self, report_schedule: ReportSchedule):
def __init__(self, report_schedule: ReportSchedule, execution_id: UUID):
self._report_schedule = report_schedule
self._execution_id = execution_id
self._result: float | None = None

def run(self) -> bool:
Expand Down Expand Up @@ -135,6 +138,13 @@ def _is_validator_operator(self) -> bool:
self._report_schedule.validator_type == ReportScheduleValidatorType.OPERATOR
)

def _get_alert_metadata_from_object(self) -> dict[str, Any]:
return {
"report_schedule_id": self._report_schedule.id,
"execution_id": self._execution_id,
}

@logs_context(context_func=_get_alert_metadata_from_object)
def _execute_query(self) -> pd.DataFrame:
"""
Executes the actual alert SQL query template
Expand All @@ -152,6 +162,13 @@ def _execute_query(self) -> pd.DataFrame:
rendered_sql, ALERT_SQL_LIMIT
)

if app.config["MUTATE_ALERT_QUERY"]:
limited_rendered_sql = (
self._report_schedule.database.mutate_sql_based_on_config(
limited_rendered_sql
)
)

executor, username = get_executor( # pylint: disable=unused-variable
executor_types=app.config["ALERT_REPORTS_EXECUTE_AS"],
model=self._report_schedule,
Expand Down
4 changes: 2 additions & 2 deletions superset/commands/report/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ def next(self) -> None:
try:
# If it's an alert check if the alert is triggered
if self._report_schedule.type == ReportScheduleType.ALERT:
if not AlertCommand(self._report_schedule).run():
if not AlertCommand(self._report_schedule, self._execution_id).run():
self.update_report_schedule_and_log(ReportState.NOOP)
return
self.send()
Expand Down Expand Up @@ -782,7 +782,7 @@ def next(self) -> None:
return
self.update_report_schedule_and_log(ReportState.WORKING)
try:
if not AlertCommand(self._report_schedule).run():
if not AlertCommand(self._report_schedule, self._execution_id).run():
self.update_report_schedule_and_log(ReportState.NOOP)
return
except Exception as ex:
Expand Down
4 changes: 4 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,10 @@ def SQL_QUERY_MUTATOR( # pylint: disable=invalid-name,unused-argument # noqa:
MUTATE_AFTER_SPLIT = False


# Boolean config that determines if alert SQL queries should also be mutated or not.
MUTATE_ALERT_QUERY = False


# This allows for a user to add header data to any outgoing emails. For example,
# if you need to include metadata in the header or you want to change the specifications
# of the email title, header, or sender.
Expand Down
121 changes: 116 additions & 5 deletions tests/integration_tests/reports/alert_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=invalid-name, unused-argument, import-outside-toplevel
import uuid
from contextlib import nullcontext, suppress
from typing import Optional, Union

Expand Down Expand Up @@ -84,7 +85,7 @@ def test_execute_query_as_report_executor(
database=get_example_database(),
validator_config_json='{"op": "==", "threshold": 1}',
)
command = AlertCommand(report_schedule=report_schedule)
command = AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4())
override_user_mock = mocker.patch("superset.commands.report.alert.override_user")
cm = (
pytest.raises(type(expected_result))
Expand All @@ -98,6 +99,83 @@ def test_execute_query_as_report_executor(
app.config["ALERT_REPORTS_EXECUTE_AS"] = original_config


def test_execute_query_mutate_query_enabled(
mocker: MockerFixture,
app_context: AppContext,
get_user,
) -> None:
from superset.commands.report.alert import AlertCommand
from superset.reports.models import ReportSchedule

default_alert_mutate_ff = app.config["MUTATE_ALERT_QUERY"]

app.config["MUTATE_ALERT_QUERY"] = True
mocker.patch("superset.commands.report.alert.override_user")
mock_df = mocker.MagicMock(spec=pd.DataFrame)
mock_df.empty = True
mock_database = get_example_database()
mock_get_df = mocker.patch.object(mock_database, "get_df", return_value=mock_df)
mock_limited_sql = mocker.patch.object(mock_database, "apply_limit_to_sql")
mock_mutate_call = mocker.patch.object(mock_database, "mutate_sql_based_on_config")

report_schedule = ReportSchedule(
created_by=get_user("admin"),
owners=[get_user("admin")],
type=ReportScheduleType.ALERT,
description="description",
crontab="0 9 * * *",
creation_method=ReportCreationMethod.ALERTS_REPORTS,
sql="SELECT 1",
grace_period=14400,
working_timeout=3600,
database=mock_database,
validator_config_json='{"op": "==", "threshold": 1}',
)
AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()).run()

mock_mutate_call.assert_called_once_with(mock_limited_sql.return_value)
mock_get_df.assert_called_once_with(sql=mock_mutate_call.return_value)

app.config["MUTATE_ALERT_QUERY"] = default_alert_mutate_ff


def test_execute_query_mutate_query_disabled(
mocker: MockerFixture,
app_context: AppContext,
get_user,
) -> None:
from superset.commands.report.alert import AlertCommand
from superset.reports.models import ReportSchedule

default_alert_mutate_ff = app.config["MUTATE_ALERT_QUERY"]

app.config["MUTATE_ALERT_QUERY"] = False
mocker.patch("superset.commands.report.alert.override_user")
mock_database = mocker.MagicMock()

report_schedule = ReportSchedule(
created_by=get_user("admin"),
owners=[get_user("admin")],
type=ReportScheduleType.ALERT,
description="description",
crontab="0 9 * * *",
creation_method=ReportCreationMethod.ALERTS_REPORTS,
sql="SELECT 1",
grace_period=14400,
working_timeout=3600,
database=mock_database,
validator_config_json='{"op": "==", "threshold": 1}',
)
AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()).run()

mock_database.mutate_sql_based_on_config.assert_not_called()
mock_database.get_df.assert_called_once_with(
sql=mock_database.apply_limit_to_sql.return_value
)

app.config["MUTATE_ALERT_QUERY"] = default_alert_mutate_ff


def test_execute_query_succeeded_no_retry(
mocker: MockerFixture, app_context: None
) -> None:
Expand All @@ -108,7 +186,7 @@ def test_execute_query_succeeded_no_retry(
side_effect=lambda: pd.DataFrame([{"sample_col": 0}]),
)

command = AlertCommand(report_schedule=mocker.Mock())
command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4())

command.validate()

Expand Down Expand Up @@ -140,7 +218,7 @@ def _mocked_execute_query() -> pd.DataFrame:
execute_query_mock.side_effect = _mocked_execute_query
execute_query_mock.__name__ = "mocked_execute_query"

command = AlertCommand(report_schedule=mocker.Mock())
command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4())

command.validate()

Expand All @@ -162,7 +240,7 @@ def _mocked_execute_query() -> None:
execute_query_mock.side_effect = _mocked_execute_query
execute_query_mock.__name__ = "mocked_execute_query"

command = AlertCommand(report_schedule=mocker.Mock())
command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4())

with suppress(AlertQueryTimeout):
command.validate()
Expand All @@ -184,9 +262,42 @@ def _mocked_execute_query() -> None:
execute_query_mock.side_effect = _mocked_execute_query
execute_query_mock.__name__ = "mocked_execute_query"

command = AlertCommand(report_schedule=mocker.Mock())
command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4())

with suppress(AlertQueryError):
command.validate()
# Should match the value defined in superset_test_config.py
assert execute_query_mock.call_count == 3


def test_get_alert_metadata_from_object(
mocker: MockerFixture,
app_context: AppContext,
get_user,
) -> None:
from superset.commands.report.alert import AlertCommand
from superset.reports.models import ReportSchedule

app.config["ALERT_REPORTS_EXECUTE_AS"] = [ExecutorType.OWNER]

mock_database = mocker.MagicMock()
mock_exec_id = uuid.uuid4()
report_schedule = ReportSchedule(
created_by=get_user("admin"),
owners=[get_user("admin")],
type=ReportScheduleType.ALERT,
description="description",
crontab="0 9 * * *",
creation_method=ReportCreationMethod.ALERTS_REPORTS,
sql="SELECT 1",
grace_period=14400,
working_timeout=3600,
database=mock_database,
validator_config_json='{"op": "==", "threshold": 1}',
)

cm = AlertCommand(report_schedule=report_schedule, execution_id=mock_exec_id)
assert cm._get_alert_metadata_from_object() == {
"report_schedule_id": report_schedule.id,
"execution_id": mock_exec_id,
}

0 comments on commit 754ccd0

Please sign in to comment.