Skip to content

Commit

Permalink
Enables setting of raise_on_failed_jobs in airflow_helper
Browse files Browse the repository at this point in the history
  • Loading branch information
willi-mueller committed Sep 9, 2024
1 parent e8c8b22 commit b891bc1
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions dlt/helpers/airflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(
retry_policy: Retrying = DEFAULT_RETRY_NO_RETRY,
retry_pipeline_steps: Sequence[TPipelineStep] = ("load",),
fail_task_if_any_job_failed: bool = True,
abort_task_if_any_job_failed: bool = False,
abort_task_if_any_job_failed: bool = True,
wipe_local_data: bool = True,
save_load_info: bool = False,
save_trace_info: bool = False,
Expand All @@ -86,7 +86,7 @@ def __init__(
This happens **after all dlt loading jobs executed**. See more here: https://dlthub.com/docs/running-in-production/running#failed-jobs
`abort_task_if_any_job_failed` will abort the other dlt loading jobs and fail the Airflow task in any of the jobs failed. This may put your warehouse in
inconsistent state so the option is disabled by default.
inconsistent state. See https://dlthub.com/docs/running-in-production/running#handle-exceptions-failed-jobs-and-retry-the-pipeline.
The load info and trace info can be optionally saved to the destination. See https://dlthub.com/docs/running-in-production/running#inspect-and-save-the-load-info-and-trace
Expand Down Expand Up @@ -270,10 +270,9 @@ def _run(
dlt.config["data_writer.buffer_max_items"] = self.buffer_max_items
logger.info(f"Set data_writer.buffer_max_items to {self.buffer_max_items}")

# enable abort package if job failed
if self.abort_task_if_any_job_failed:
dlt.config["load.raise_on_failed_jobs"] = True
logger.info("Set load.abort_task_if_any_job_failed to True")
if self.abort_task_if_any_job_failed is not None:
dlt.config["load.raise_on_failed_jobs"] = self.abort_task_if_any_job_failed
logger.info("Set load.abort_task_if_any_job_failed to {self.abort_task_if_any_job_failed}")

if self.log_progress_period > 0 and task_pipeline.collector == NULL_COLLECTOR:
task_pipeline.collector = log(log_period=self.log_progress_period, logger=logger.LOGGER)
Expand Down

0 comments on commit b891bc1

Please sign in to comment.