Skip to content

Commit

Permalink
Merge branch 'airflow_clear_2' into airflow_clear
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaFaer committed Aug 19, 2024
2 parents 5bf78ae + 3bad372 commit f27a0fd
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
2 changes: 2 additions & 0 deletions dlt/helpers/airflow_helper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import functools
import os
import shutil
from tempfile import gettempdir
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple

Expand Down Expand Up @@ -337,6 +338,7 @@ def log_after_attempt(retry_state: RetryCallState) -> None:
if self.wipe_local_data:
logger.info(f"Removing folder {pipeline.working_dir}")
task_pipeline._wipe_working_folder()
shutil.rmtree(os.environ["DLT_DATA_DIR"])

@with_telemetry("helper", "airflow_add_run", False, "decompose")
def add_run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,3 +988,35 @@ def dag_regular():
mock.call(f'on_before_run test: {pendulum.tomorrow().format("YYYY-MM-DD")}'),
]
)


def test_working_dir_clear():
@dag(
schedule_interval="@daily",
start_date=pendulum.datetime(2023, 7, 1),
catchup=False,
max_active_runs=1,
)
def load_data():
tasks = PipelineTasksGroup(
"pipeline_decomposed", use_data_folder=False, wipe_local_data=True
)

pipeline = dlt.pipeline(
pipeline_name="airflow_clear_test",
dataset_name="airflow_clear_test_ds",
destination="duckdb",
full_refresh=True,
)
tasks.run(
pipeline,
mock_data_source(),
trigger_rule="all_done",
retries=0,
provide_context=True,
)

dag_ref = load_data()
dag_ref.test()

assert not os.path.exists(os.environ["DLT_DATA_DIR"])

0 comments on commit f27a0fd

Please sign in to comment.