Skip to content

Commit

Permalink
Remove Provider Deprecations in DBT (#44638)
Browse files Browse the repository at this point in the history
* Remove Provider Deprecations in DBT

* Fix Changelog typo

Co-authored-by: Wei Lee <[email protected]>

---------

Co-authored-by: Wei Lee <[email protected]>
  • Loading branch information
jason810496 and Lee-W authored Dec 6, 2024
1 parent 7717985 commit 666d80b
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 102 deletions.
11 changes: 11 additions & 0 deletions providers/src/airflow/providers/dbt/cloud/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@
Changelog
---------

main
.....

.. warning::
All deprecated classes, parameters and features have been removed from the DBT provider package.
The following breaking changes were introduced:

* Sensors
* Remove ``airflow.providers.dbt.cloud.sensors.dbt.DbtCloudJobRunAsyncSensor``. Use ``airflow.providers.dbt.cloud.sensors.dbt.DbtCloudJobRunSensor`` with ``deferrable`` set to ``True`` instead.
* Removed ``polling_interval`` parameter from ``DbtCloudJobRunSensor``. Use ``poke_interval`` instead.

3.11.2
......

Expand Down
36 changes: 2 additions & 34 deletions providers/src/airflow/providers/dbt/cloud/sensors/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
from __future__ import annotations

import time
import warnings
from functools import cached_property
from typing import TYPE_CHECKING, Any

from deprecated import deprecated

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowException
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunException, DbtCloudJobRunStatus
from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
from airflow.providers.dbt.cloud.utils.openlineage import generate_openlineage_events_from_dbt_cloud_run
Expand Down Expand Up @@ -62,17 +59,7 @@ def __init__(
) -> None:
if deferrable:
if "poke_interval" not in kwargs:
# TODO: Remove once deprecated
if "polling_interval" in kwargs:
kwargs["poke_interval"] = kwargs["polling_interval"]
warnings.warn(
"Argument `poll_interval` is deprecated and will be removed "
"in a future release. Please use `poke_interval` instead.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
else:
kwargs["poke_interval"] = 5
kwargs["poke_interval"] = 5

if "timeout" not in kwargs:
kwargs["timeout"] = 60 * 60 * 24 * 7
Expand Down Expand Up @@ -142,22 +129,3 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage:
"""Implement _on_complete because job_run needs to be triggered first in execute method."""
return generate_openlineage_events_from_dbt_cloud_run(operator=self, task_instance=task_instance)


@deprecated(
reason=(
"Class `DbtCloudJobRunAsyncSensor` is deprecated and will be removed in a future release. "
"Please use `DbtCloudJobRunSensor` and set `deferrable` attribute to `True` instead"
),
category=AirflowProviderDeprecationWarning,
)
class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
"""
This class is deprecated.
Please use :class:`airflow.providers.dbt.cloud.sensor.dbt.DbtCloudJobRunSensor`
with ``deferrable=True``.
"""

def __init__(self, **kwargs: Any) -> None:
super().__init__(deferrable=True, **kwargs)
69 changes: 1 addition & 68 deletions providers/tests/dbt/cloud/sensors/test_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@

from airflow.exceptions import (
AirflowException,
AirflowProviderDeprecationWarning,
TaskDeferred,
)
from airflow.models.connection import Connection
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunException, DbtCloudJobRunStatus
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunAsyncSensor, DbtCloudJobRunSensor
from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
from airflow.utils import db

Expand Down Expand Up @@ -168,69 +167,3 @@ def test_execute_complete_failure(self, mock_status, mock_message):
task.execute_complete(
context={}, event={"status": mock_status, "message": mock_message, "run_id": self.DBT_RUN_ID}
)


class TestDbtCloudJobRunAsyncSensor:
TASK_ID = "dbt_cloud_run_job"
CONN_ID = "dbt_cloud_default"
DBT_RUN_ID = 1234
TIMEOUT = 300

depcrecation_message = (
"Class `DbtCloudJobRunAsyncSensor` is deprecated and will be removed in a future release. "
"Please use `DbtCloudJobRunSensor` and set `deferrable` attribute to `True` instead"
)

@mock.patch("airflow.providers.dbt.cloud.sensors.dbt.DbtCloudHook")
def test_dbt_job_run_async_sensor(self, mock_hook):
"""Assert execute method defer for Dbt cloud job run status sensors"""

with pytest.warns(AirflowProviderDeprecationWarning, match=self.depcrecation_message):
task = DbtCloudJobRunAsyncSensor(
dbt_cloud_conn_id=self.CONN_ID,
task_id=self.TASK_ID,
run_id=self.DBT_RUN_ID,
timeout=self.TIMEOUT,
)
mock_hook.return_value.get_job_run_status.return_value = DbtCloudJobRunStatus.STARTING.value
with pytest.raises(TaskDeferred) as exc:
task.execute({})
assert isinstance(exc.value.trigger, DbtCloudRunJobTrigger), "Trigger is not a DbtCloudRunJobTrigger"

def test_dbt_job_run_async_sensor_execute_complete_success(self):
"""Assert execute_complete log success message when trigger fire with target status"""
with pytest.warns(AirflowProviderDeprecationWarning, match=self.depcrecation_message):
task = DbtCloudJobRunAsyncSensor(
dbt_cloud_conn_id=self.CONN_ID,
task_id=self.TASK_ID,
run_id=self.DBT_RUN_ID,
timeout=self.TIMEOUT,
)

msg = f"Job run {self.DBT_RUN_ID} has completed successfully."
with mock.patch.object(task.log, "info") as mock_log_info:
task.execute_complete(
context={}, event={"status": "success", "message": msg, "run_id": self.DBT_RUN_ID}
)
mock_log_info.assert_called_with(msg)

@pytest.mark.parametrize(
"mock_status, mock_message",
[
("cancelled", "Job run 1234 has been cancelled."),
("error", "Job run 1234 has failed."),
],
)
def test_dbt_job_run_async_sensor_execute_complete_failure(self, mock_status, mock_message):
"""Assert execute_complete method to raise exception on the cancelled and error status"""
with pytest.warns(AirflowProviderDeprecationWarning, match=self.depcrecation_message):
task = DbtCloudJobRunAsyncSensor(
dbt_cloud_conn_id=self.CONN_ID,
task_id=self.TASK_ID,
run_id=self.DBT_RUN_ID,
timeout=self.TIMEOUT,
)
with pytest.raises(AirflowException):
task.execute_complete(
context={}, event={"status": mock_status, "message": mock_message, "run_id": self.DBT_RUN_ID}
)

0 comments on commit 666d80b

Please sign in to comment.