diff --git a/.github/workflows/airflow-plugin.yml b/.github/workflows/airflow-plugin.yml index 21fd547114872a..a9be56a69560ca 100644 --- a/.github/workflows/airflow-plugin.yml +++ b/.github/workflows/airflow-plugin.yml @@ -51,6 +51,9 @@ jobs: - python-version: "3.10" extra_pip_requirements: 'apache-airflow==2.8.1 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt' extra_pip_extras: plugin-v2 + - python-version: "3.10" + extra_pip_requirements: 'apache-airflow==2.9.0 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.0/constraints-3.10.txt' + extra_pip_extras: plugin-v2 fail-fast: false steps: - name: Set up JDK 17 diff --git a/metadata-ingestion-modules/airflow-plugin/setup.py b/metadata-ingestion-modules/airflow-plugin/setup.py index 90167126bc349c..065e9454c5d9e0 100644 --- a/metadata-ingestion-modules/airflow-plugin/setup.py +++ b/metadata-ingestion-modules/airflow-plugin/setup.py @@ -42,7 +42,7 @@ def get_long_description(): # We remain restrictive on the versions allowed here to prevent # us from being broken by backwards-incompatible changes in the # underlying package. - "openlineage-airflow>=1.2.0,<=1.7.0", + "openlineage-airflow>=1.2.0,<=1.12.0", }, } diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index cdba268eed56b7..d67754605c71be 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -420,6 +420,7 @@ def run_datajob( config: Optional[DatahubLineageConfig] = None, ) -> DataProcessInstance: if datajob is None: + assert ti.task is not None datajob = AirflowGenerator.generate_datajob( cluster, ti.task, dag, config=config ) @@ -509,6 +510,7 @@ def complete_datajob( :return: DataProcessInstance """ if datajob is None: + assert ti.task is not None datajob = AirflowGenerator.generate_datajob( cluster, ti.task, dag, config=config ) @@ -530,6 +532,7 @@ def complete_datajob( f"Result should be either success or failure and it was {ti.state}" ) + assert datajob is not None dpi = DataProcessInstance.from_datajob( datajob=datajob, id=f"{dag.dag_id}_{ti.task_id}_{dag_run.run_id}", diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index cf55d5347a3389..15f76a8b1e1d09 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -360,6 +360,7 @@ def on_task_instance_running( # The type ignore is to placate mypy on Airflow 2.1.x. dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined] task = task_instance.task + assert task is not None dag: "DAG" = task.dag # type: ignore[assignment] self._task_holder.set_task(task_instance) @@ -447,6 +448,7 @@ def on_task_instance_finish( ) -> None: dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined] task = self._task_holder.get_task(task_instance) or task_instance.task + assert task is not None dag: "DAG" = task.dag # type: ignore[assignment] datajob = AirflowGenerator.generate_datajob(