From b891bc16662572fe815069b17e5b4be729d7a610 Mon Sep 17 00:00:00 2001 From: Willi Date: Mon, 9 Sep 2024 13:44:49 +0530 Subject: [PATCH] Enables setting of raise_on_failed_jobs in airflow_helper --- dlt/helpers/airflow_helper.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/dlt/helpers/airflow_helper.py b/dlt/helpers/airflow_helper.py index 8494d3bba3..fca36941f0 100644 --- a/dlt/helpers/airflow_helper.py +++ b/dlt/helpers/airflow_helper.py @@ -67,7 +67,7 @@ def __init__( retry_policy: Retrying = DEFAULT_RETRY_NO_RETRY, retry_pipeline_steps: Sequence[TPipelineStep] = ("load",), fail_task_if_any_job_failed: bool = True, - abort_task_if_any_job_failed: bool = False, + abort_task_if_any_job_failed: bool = True, wipe_local_data: bool = True, save_load_info: bool = False, save_trace_info: bool = False, @@ -86,7 +86,7 @@ def __init__( This happens **after all dlt loading jobs executed**. See more here: https://dlthub.com/docs/running-in-production/running#failed-jobs `abort_task_if_any_job_failed` will abort the other dlt loading jobs and fail the Airflow task in any of the jobs failed. This may put your warehouse in - inconsistent state so the option is disabled by default. + inconsistent state. See https://dlthub.com/docs/running-in-production/running#handle-exceptions-failed-jobs-and-retry-the-pipeline. The load info and trace info can be optionally saved to the destination. See https://dlthub.com/docs/running-in-production/running#inspect-and-save-the-load-info-and-trace @@ -270,10 +270,9 @@ def _run( dlt.config["data_writer.buffer_max_items"] = self.buffer_max_items logger.info(f"Set data_writer.buffer_max_items to {self.buffer_max_items}") - # enable abort package if job failed - if self.abort_task_if_any_job_failed: - dlt.config["load.raise_on_failed_jobs"] = True - logger.info("Set load.abort_task_if_any_job_failed to True") + if self.abort_task_if_any_job_failed is not None: + dlt.config["load.raise_on_failed_jobs"] = self.abort_task_if_any_job_failed + logger.info("Set load.abort_task_if_any_job_failed to {self.abort_task_if_any_job_failed}") if self.log_progress_period > 0 and task_pipeline.collector == NULL_COLLECTOR: task_pipeline.collector = log(log_period=self.log_progress_period, logger=logger.LOGGER)