From 98ca505fd06b8146a4355c6355174abe8b45ef66 Mon Sep 17 00:00:00 2001 From: VioletM Date: Wed, 28 Aug 2024 06:28:50 -0400 Subject: [PATCH] Expose staging tables truncation to config (#1717) * Expose staging tables truncation to config * Fix comments, add tests * Fix tests * Move implementation from mixing, add tests * Fix docs grammar --- dlt/common/destination/reference.py | 8 ++- dlt/destinations/impl/athena/athena.py | 2 +- dlt/destinations/impl/bigquery/bigquery.py | 3 + .../impl/clickhouse/clickhouse.py | 3 + .../impl/databricks/databricks.py | 3 + dlt/destinations/impl/dremio/dremio.py | 3 + dlt/destinations/impl/dummy/configuration.py | 2 + dlt/destinations/impl/dummy/dummy.py | 3 + dlt/destinations/impl/redshift/redshift.py | 3 + dlt/destinations/impl/snowflake/snowflake.py | 3 + dlt/destinations/impl/synapse/synapse.py | 3 + dlt/load/utils.py | 7 +- docs/website/docs/dlt-ecosystem/staging.md | 72 ++++++++++++------- tests/load/pipeline/test_stage_loading.py | 57 ++++++++++++++- tests/load/test_dummy_client.py | 17 +++++ 15 files changed, 152 insertions(+), 37 deletions(-) diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 744cbbd1f5..0944b03bea 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -269,6 +269,8 @@ class DestinationClientDwhWithStagingConfiguration(DestinationClientDwhConfigura staging_config: Optional[DestinationClientStagingConfiguration] = None """configuration of the staging, if present, injected at runtime""" + truncate_tables_on_staging_destination_before_load: bool = True + """If dlt should truncate the tables on staging destination before loading data.""" TLoadJobState = Literal["ready", "running", "failed", "retry", "completed"] @@ -578,7 +580,7 @@ def with_staging_dataset(self) -> ContextManager["JobClientBase"]: return self # type: ignore -class SupportsStagingDestination: +class SupportsStagingDestination(ABC): """Adds capability to support a staging destination for the load""" def should_load_data_to_staging_dataset_on_staging_destination( @@ -586,9 +588,9 @@ def should_load_data_to_staging_dataset_on_staging_destination( ) -> bool: return False + @abstractmethod def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: - # the default is to truncate the tables on the staging destination... - return True + pass # TODO: type Destination properly diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 0c90d171a3..b28309b930 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -531,7 +531,7 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable if table["write_disposition"] == "replace" and not self._is_iceberg_table( self.prepare_load_table(table["name"]) ): - return True + return self.config.truncate_tables_on_staging_destination_before_load return False def should_load_data_to_staging_dataset_on_staging_destination( diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index 8291415434..11326cf3ed 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -503,6 +503,9 @@ def _should_autodetect_schema(self, table_name: str) -> bool: self.schema._schema_tables, table_name, AUTODETECT_SCHEMA_HINT, allow_none=True ) or (self.config.autodetect_schema and table_name not in self.schema.dlt_table_names()) + def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: + return self.config.truncate_tables_on_staging_destination_before_load + def _streaming_load( items: List[Dict[Any, Any]], table: Dict[str, Any], job_client: BigQueryClient diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 5f17a5a18c..282fbaf338 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -372,3 +372,6 @@ def _from_db_type( self, ch_t: str, precision: Optional[int], scale: Optional[int] ) -> TColumnType: return self.type_mapper.from_db_type(ch_t, precision, scale) + + def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: + return self.config.truncate_tables_on_staging_destination_before_load diff --git a/dlt/destinations/impl/databricks/databricks.py b/dlt/destinations/impl/databricks/databricks.py index 2f23e88ea0..38412b2608 100644 --- a/dlt/destinations/impl/databricks/databricks.py +++ b/dlt/destinations/impl/databricks/databricks.py @@ -325,3 +325,6 @@ def _get_storage_table_query_columns(self) -> List[str]: "full_data_type" ) return fields + + def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: + return self.config.truncate_tables_on_staging_destination_before_load diff --git a/dlt/destinations/impl/dremio/dremio.py b/dlt/destinations/impl/dremio/dremio.py index 68a3fedc31..149d106dcd 100644 --- a/dlt/destinations/impl/dremio/dremio.py +++ b/dlt/destinations/impl/dremio/dremio.py @@ -210,3 +210,6 @@ def _make_add_column_sql( self, new_columns: Sequence[TColumnSchema], table_format: TTableFormat = None ) -> List[str]: return ["ADD COLUMNS (" + ", ".join(self._get_column_def_sql(c) for c in new_columns) + ")"] + + def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: + return self.config.truncate_tables_on_staging_destination_before_load diff --git a/dlt/destinations/impl/dummy/configuration.py b/dlt/destinations/impl/dummy/configuration.py index 023b88e51a..a066479294 100644 --- a/dlt/destinations/impl/dummy/configuration.py +++ b/dlt/destinations/impl/dummy/configuration.py @@ -34,6 +34,8 @@ class DummyClientConfiguration(DestinationClientConfiguration): """raise terminal exception in job init""" fail_transiently_in_init: bool = False """raise transient exception in job init""" + truncate_tables_on_staging_destination_before_load: bool = True + """truncate tables on staging destination""" # new jobs workflows create_followup_jobs: bool = False diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index 49b55ec65d..feb09369dc 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -202,6 +202,9 @@ def complete_load(self, load_id: str) -> None: def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool: return super().should_load_data_to_staging_dataset(table) + def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: + return self.config.truncate_tables_on_staging_destination_before_load + @contextmanager def with_staging_dataset(self) -> Iterator[JobClientBase]: try: diff --git a/dlt/destinations/impl/redshift/redshift.py b/dlt/destinations/impl/redshift/redshift.py index 93827c8163..0e201dc4e0 100644 --- a/dlt/destinations/impl/redshift/redshift.py +++ b/dlt/destinations/impl/redshift/redshift.py @@ -274,3 +274,6 @@ def _from_db_type( self, pq_t: str, precision: Optional[int], scale: Optional[int] ) -> TColumnType: return self.type_mapper.from_db_type(pq_t, precision, scale) + + def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: + return self.config.truncate_tables_on_staging_destination_before_load diff --git a/dlt/destinations/impl/snowflake/snowflake.py b/dlt/destinations/impl/snowflake/snowflake.py index 8b4eabc961..6688b5bc17 100644 --- a/dlt/destinations/impl/snowflake/snowflake.py +++ b/dlt/destinations/impl/snowflake/snowflake.py @@ -325,3 +325,6 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non return ( f"{name} {self.type_mapper.to_db_type(c)} {self._gen_not_null(c.get('nullable', True))}" ) + + def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: + return self.config.truncate_tables_on_staging_destination_before_load diff --git a/dlt/destinations/impl/synapse/synapse.py b/dlt/destinations/impl/synapse/synapse.py index e43e2a6dfa..750a4895f0 100644 --- a/dlt/destinations/impl/synapse/synapse.py +++ b/dlt/destinations/impl/synapse/synapse.py @@ -173,6 +173,9 @@ def create_load_job( ) return job + def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: + return self.config.truncate_tables_on_staging_destination_before_load + class SynapseCopyFileLoadJob(CopyRemoteFileLoadJob): def __init__( diff --git a/dlt/load/utils.py b/dlt/load/utils.py index 741c01f249..e3a2ebcd79 100644 --- a/dlt/load/utils.py +++ b/dlt/load/utils.py @@ -179,9 +179,10 @@ def _init_dataset_and_update_schema( applied_update = job_client.update_stored_schema( only_tables=update_tables, expected_update=expected_update ) - logger.info( - f"Client for {job_client.config.destination_type} will truncate tables {staging_text}" - ) + if truncate_tables: + logger.info( + f"Client for {job_client.config.destination_type} will truncate tables {staging_text}" + ) job_client.initialize_storage(truncate_tables=truncate_tables) return applied_update diff --git a/docs/website/docs/dlt-ecosystem/staging.md b/docs/website/docs/dlt-ecosystem/staging.md index 05e31a574b..789189b7dd 100644 --- a/docs/website/docs/dlt-ecosystem/staging.md +++ b/docs/website/docs/dlt-ecosystem/staging.md @@ -1,36 +1,33 @@ --- title: Staging -description: Configure an s3 or gcs bucket for staging before copying into the destination +description: Configure an S3 or GCS bucket for staging before copying into the destination keywords: [staging, destination] --- # Staging -The goal of staging is to bring the data closer to the database engine so the modification of the destination (final) dataset happens faster and without errors. `dlt`, when asked, creates two -staging areas: +The goal of staging is to bring the data closer to the database engine so that the modification of the destination (final) dataset happens faster and without errors. `dlt`, when asked, creates two staging areas: 1. A **staging dataset** used by the [merge and replace loads](../general-usage/incremental-loading.md#merge-incremental_loading) to deduplicate and merge data with the destination. -2. A **staging storage** which is typically a s3/gcp bucket where [loader files](file-formats/) are copied before they are loaded by the destination. +2. A **staging storage** which is typically an S3/GCP bucket where [loader files](file-formats/) are copied before they are loaded by the destination. ## Staging dataset -`dlt` creates a staging dataset when write disposition of any of the loaded resources requires it. It creates and migrates required tables exactly like for the -main dataset. Data in staging tables is truncated when load step begins and only for tables that will participate in it. -Such staging dataset has the same name as the dataset passed to `dlt.pipeline` but with `_staging` suffix in the name. Alternatively, you can provide your own staging dataset pattern or use a fixed name, identical for all the -configured datasets. +`dlt` creates a staging dataset when the write disposition of any of the loaded resources requires it. It creates and migrates required tables exactly like for the main dataset. Data in staging tables is truncated when the load step begins and only for tables that will participate in it. +Such a staging dataset has the same name as the dataset passed to `dlt.pipeline` but with a `_staging` suffix in the name. Alternatively, you can provide your own staging dataset pattern or use a fixed name, identical for all the configured datasets. ```toml [destination.postgres] staging_dataset_name_layout="staging_%s" ``` -Entry above switches the pattern to `staging_` prefix and for example for dataset with name **github_data** `dlt` will create **staging_github_data**. +The entry above switches the pattern to `staging_` prefix and for example, for a dataset with the name **github_data**, `dlt` will create **staging_github_data**. -To configure static staging dataset name, you can do the following (we use destination factory) +To configure a static staging dataset name, you can do the following (we use the destination factory) ```py import dlt dest_ = dlt.destinations.postgres(staging_dataset_name_layout="_dlt_staging") ``` -All pipelines using `dest_` as destination will use **staging_dataset** to store staging tables. Make sure that your pipelines are not overwriting each other's tables. +All pipelines using `dest_` as the destination will use the **staging_dataset** to store staging tables. Make sure that your pipelines are not overwriting each other's tables. -### Cleanup up staging dataset automatically -`dlt` does not truncate tables in staging dataset at the end of the load. Data that is left after contains all the extracted data and may be useful for debugging. +### Cleanup staging dataset automatically +`dlt` does not truncate tables in the staging dataset at the end of the load. Data that is left after contains all the extracted data and may be useful for debugging. If you prefer to truncate it, put the following line in `config.toml`: ```toml @@ -39,19 +36,23 @@ truncate_staging_dataset=true ``` ## Staging storage -`dlt` allows to chain destinations where the first one (`staging`) is responsible for uploading the files from local filesystem to the remote storage. It then generates followup jobs for the second destination that (typically) copy the files from remote storage into destination. +`dlt` allows chaining destinations where the first one (`staging`) is responsible for uploading the files from the local filesystem to the remote storage. It then generates follow-up jobs for the second destination that (typically) copy the files from remote storage into the destination. -Currently, only one destination the [filesystem](destinations/filesystem.md) can be used as a staging. Following destinations can copy remote files: -1. [Redshift.](destinations/redshift.md#staging-support) -2. [Bigquery.](destinations/bigquery.md#staging-support) -3. [Snowflake.](destinations/snowflake.md#staging-support) +Currently, only one destination, the [filesystem](destinations/filesystem.md), can be used as staging. The following destinations can copy remote files: + +1. [Azure Synapse](destinations/synapse#staging-support) +1. [Athena](destinations/athena#staging-support) +1. [Bigquery](destinations/bigquery.md#staging-support) +1. [Dremio](destinations/dremio#staging-support) +1. [Redshift](destinations/redshift.md#staging-support) +1. [Snowflake](destinations/snowflake.md#staging-support) ### How to use -In essence, you need to set up two destinations and then pass them to `dlt.pipeline`. Below we'll use `filesystem` staging with `parquet` files to load into `Redshift` destination. +In essence, you need to set up two destinations and then pass them to `dlt.pipeline`. Below we'll use `filesystem` staging with `parquet` files to load into the `Redshift` destination. -1. **Set up the s3 bucket and filesystem staging.** +1. **Set up the S3 bucket and filesystem staging.** - Please follow our guide in [filesystem destination documentation](destinations/filesystem.md). Test the staging as standalone destination to make sure that files go where you want them. In your `secrets.toml` you should now have a working `filesystem` configuration: + Please follow our guide in the [filesystem destination documentation](destinations/filesystem.md). Test the staging as a standalone destination to make sure that files go where you want them. In your `secrets.toml`, you should now have a working `filesystem` configuration: ```toml [destination.filesystem] bucket_url = "s3://[your_bucket_name]" # replace with your bucket name, @@ -63,15 +64,15 @@ In essence, you need to set up two destinations and then pass them to `dlt.pipel 2. **Set up the Redshift destination.** - Please follow our guide in [redshift destination documentation](destinations/redshift.md). In your `secrets.toml` you added: + Please follow our guide in the [redshift destination documentation](destinations/redshift.md). In your `secrets.toml`, you added: ```toml # keep it at the top of your toml file! before any section starts destination.redshift.credentials="redshift://loader:@localhost/dlt_data?connect_timeout=15" ``` -3. **Authorize Redshift cluster to access the staging bucket.** +3. **Authorize the Redshift cluster to access the staging bucket.** - By default `dlt` will forward the credentials configured for `filesystem` to the `Redshift` COPY command. If you are fine with this, move to the next step. + By default, `dlt` will forward the credentials configured for `filesystem` to the `Redshift` COPY command. If you are fine with this, move to the next step. 4. **Chain staging to destination and request `parquet` file format.** @@ -79,7 +80,7 @@ In essence, you need to set up two destinations and then pass them to `dlt.pipel ```py # Create a dlt pipeline that will load # chess player data to the redshift destination - # via staging on s3 + # via staging on S3 pipeline = dlt.pipeline( pipeline_name='chess_pipeline', destination='redshift', @@ -87,7 +88,7 @@ In essence, you need to set up two destinations and then pass them to `dlt.pipel dataset_name='player_data' ) ``` - `dlt` will automatically select an appropriate loader file format for the staging files. Below we explicitly specify `parquet` file format (just to demonstrate how to do it): + `dlt` will automatically select an appropriate loader file format for the staging files. Below we explicitly specify the `parquet` file format (just to demonstrate how to do it): ```py info = pipeline.run(chess(), loader_file_format="parquet") ``` @@ -96,4 +97,21 @@ In essence, you need to set up two destinations and then pass them to `dlt.pipel Run the pipeline script as usual. -> 💡 Please note that `dlt` does not delete loaded files from the staging storage after the load is complete. +:::tip +Please note that `dlt` does not delete loaded files from the staging storage after the load is complete, but it truncates previously loaded files. +::: + +### How to prevent staging files truncation + +Before `dlt` loads data to the staging storage, it truncates previously loaded files. To prevent it and keep the whole history +of loaded files, you can use the following parameter: + +```toml +[destination.redshift] +truncate_table_before_load_on_staging_destination=false +``` + +:::caution +The [Athena](destinations/athena#staging-support) destination only truncates not iceberg tables with `replace` merge_disposition. +Therefore, the parameter `truncate_table_before_load_on_staging_destination` only controls the truncation of corresponding files for these tables. +::: diff --git a/tests/load/pipeline/test_stage_loading.py b/tests/load/pipeline/test_stage_loading.py index a760c86526..f216fa3c05 100644 --- a/tests/load/pipeline/test_stage_loading.py +++ b/tests/load/pipeline/test_stage_loading.py @@ -1,12 +1,12 @@ import pytest -from typing import Dict, Any, List +from typing import List import dlt, os -from dlt.common import json, sleep -from copy import deepcopy +from dlt.common import json from dlt.common.storages.configuration import FilesystemConfiguration from dlt.common.utils import uniq_id from dlt.common.schema.typing import TDataType +from dlt.destinations.impl.filesystem.filesystem import FilesystemClient from tests.load.pipeline.test_merge_disposition import github from tests.pipeline.utils import load_table_counts, assert_load_info @@ -40,6 +40,13 @@ def load_modified_issues(): yield from issues +@dlt.resource(table_name="events", write_disposition="append", primary_key="timestamp") +def event_many_load_2(): + with open("tests/normalize/cases/event.event.many_load_2.json", "r", encoding="utf-8") as f: + events = json.load(f) + yield from events + + @pytest.mark.parametrize( "destination_config", destinations_configs(all_staging_configs=True), ids=lambda x: x.name ) @@ -183,6 +190,50 @@ def test_staging_load(destination_config: DestinationTestConfiguration) -> None: assert replace_counts == initial_counts +@pytest.mark.parametrize( + "destination_config", destinations_configs(all_staging_configs=True), ids=lambda x: x.name +) +def test_truncate_staging_dataset(destination_config: DestinationTestConfiguration) -> None: + """This test checks if tables truncation on staging destination done according to the configuration. + + Test loads data to the destination three times: + * with truncation + * without truncation (after this 2 staging files should be left) + * with truncation (after this 1 staging file should be left) + """ + pipeline = destination_config.setup_pipeline( + pipeline_name="test_stage_loading", dataset_name="test_staging_load" + uniq_id() + ) + resource = event_many_load_2() + table_name: str = resource.table_name # type: ignore[assignment] + + # load the data, files stay on the stage after the load + info = pipeline.run(resource) + assert_load_info(info) + + # load the data without truncating of the staging, should see two files on staging + pipeline.destination.config_params["truncate_tables_on_staging_destination_before_load"] = False + info = pipeline.run(resource) + assert_load_info(info) + # check there are two staging files + _, staging_client = pipeline._get_destination_clients(pipeline.default_schema) + with staging_client: + assert len(staging_client.list_table_files(table_name)) == 2 # type: ignore[attr-defined] + + # load the data with truncating, so only new file is on the staging + pipeline.destination.config_params["truncate_tables_on_staging_destination_before_load"] = True + info = pipeline.run(resource) + assert_load_info(info) + # check that table exists in the destination + with pipeline.sql_client() as sql_client: + qual_name = sql_client.make_qualified_table_name + assert len(sql_client.execute_sql(f"SELECT * from {qual_name(table_name)}")) > 4 + # check there is only one staging file + _, staging_client = pipeline._get_destination_clients(pipeline.default_schema) + with staging_client: + assert len(staging_client.list_table_files(table_name)) == 1 # type: ignore[attr-defined] + + @pytest.mark.parametrize( "destination_config", destinations_configs(all_staging_configs=True), ids=lambda x: x.name ) diff --git a/tests/load/test_dummy_client.py b/tests/load/test_dummy_client.py index 9f0bca6ac5..59b7acac15 100644 --- a/tests/load/test_dummy_client.py +++ b/tests/load/test_dummy_client.py @@ -548,6 +548,23 @@ def test_completed_loop_with_delete_completed() -> None: assert_complete_job(load, should_delete_completed=True) +@pytest.mark.parametrize("to_truncate", [True, False]) +def test_truncate_table_before_load_on_stanging(to_truncate) -> None: + load = setup_loader( + client_config=DummyClientConfiguration( + truncate_tables_on_staging_destination_before_load=to_truncate + ) + ) + load_id, schema = prepare_load_package(load.load_storage, NORMALIZED_FILES) + destination_client = load.get_destination_client(schema) + assert ( + destination_client.should_truncate_table_before_load_on_staging_destination( # type: ignore + schema.tables["_dlt_version"] + ) + == to_truncate + ) + + def test_retry_on_new_loop() -> None: # test job that retries sitting in new jobs load = setup_loader(client_config=DummyClientConfiguration(retry_prob=1.0))