From 261dbccd7d5f8f8f6230ea0d3de9aa83c940dfd6 Mon Sep 17 00:00:00 2001 From: Violetta Mishechkina Date: Thu, 22 Aug 2024 09:43:29 +0200 Subject: [PATCH 1/5] Expose staging tables truncation to config --- dlt/common/destination/reference.py | 9 ++++++++- dlt/destinations/impl/athena/athena.py | 3 +++ dlt/destinations/impl/bigquery/bigquery.py | 3 +++ dlt/destinations/impl/clickhouse/clickhouse.py | 3 +++ dlt/destinations/impl/databricks/databricks.py | 3 +++ dlt/destinations/impl/dremio/dremio.py | 3 +++ dlt/destinations/impl/dummy/dummy.py | 1 + dlt/destinations/impl/redshift/redshift.py | 3 +++ dlt/destinations/impl/snowflake/snowflake.py | 3 +++ dlt/load/utils.py | 7 ++++--- 10 files changed, 34 insertions(+), 4 deletions(-) diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 744cbbd1f5..cb562d4247 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -269,6 +269,8 @@ class DestinationClientDwhWithStagingConfiguration(DestinationClientDwhConfigura staging_config: Optional[DestinationClientStagingConfiguration] = None """configuration of the staging, if present, injected at runtime""" + truncate_table_before_load_on_staging_destination: bool = True + """If dlt should truncate the tables on staging destination before loading data.""" TLoadJobState = Literal["ready", "running", "failed", "retry", "completed"] @@ -581,6 +583,11 @@ def with_staging_dataset(self) -> ContextManager["JobClientBase"]: class SupportsStagingDestination: """Adds capability to support a staging destination for the load""" + def __init__(self, truncate_table_before_load_on_staging_destination: bool = True) -> None: + self.truncate_table_before_load_on_staging_destination = ( + truncate_table_before_load_on_staging_destination + ) + def should_load_data_to_staging_dataset_on_staging_destination( self, table: TTableSchema ) -> bool: @@ -588,7 +595,7 @@ def should_load_data_to_staging_dataset_on_staging_destination( def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: # the default is to truncate the tables on the staging destination... - return True + return self.truncate_table_before_load_on_staging_destination # TODO: type Destination properly diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 0c90d171a3..bf8849dd5b 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -389,6 +389,9 @@ def __init__( config, capabilities, ) + SupportsStagingDestination.__init__( + self, config.truncate_table_before_load_on_staging_destination + ) super().__init__(schema, config, sql_client) self.sql_client: AthenaSQLClient = sql_client # type: ignore self.config: AthenaClientConfiguration = config diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index 8291415434..0a62ea586b 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -229,6 +229,9 @@ def __init__( config.http_timeout, config.retry_deadline, ) + SupportsStagingDestination.__init__( + self, config.truncate_table_before_load_on_staging_destination + ) super().__init__(schema, config, sql_client) self.config: BigQueryClientConfiguration = config self.sql_client: BigQuerySqlClient = sql_client # type: ignore diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 5f17a5a18c..5bb23b5240 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -283,6 +283,9 @@ def __init__( capabilities, config, ) + SupportsStagingDestination.__init__( + self, config.truncate_table_before_load_on_staging_destination + ) super().__init__(schema, config, self.sql_client) self.config: ClickHouseClientConfiguration = config self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR) diff --git a/dlt/destinations/impl/databricks/databricks.py b/dlt/destinations/impl/databricks/databricks.py index 2f23e88ea0..5c10f1f4cb 100644 --- a/dlt/destinations/impl/databricks/databricks.py +++ b/dlt/destinations/impl/databricks/databricks.py @@ -262,6 +262,9 @@ def __init__( config.credentials, capabilities, ) + SupportsStagingDestination.__init__( + self, config.truncate_table_before_load_on_staging_destination + ) super().__init__(schema, config, sql_client) self.config: DatabricksClientConfiguration = config self.sql_client: DatabricksSqlClient = sql_client # type: ignore[assignment] diff --git a/dlt/destinations/impl/dremio/dremio.py b/dlt/destinations/impl/dremio/dremio.py index 68a3fedc31..9dcad4cfa9 100644 --- a/dlt/destinations/impl/dremio/dremio.py +++ b/dlt/destinations/impl/dremio/dremio.py @@ -147,6 +147,9 @@ def __init__( config.credentials, capabilities, ) + SupportsStagingDestination.__init__( + self, config.truncate_table_before_load_on_staging_destination + ) super().__init__(schema, config, sql_client) self.config: DremioClientConfiguration = config self.sql_client: DremioSqlClient = sql_client # type: ignore diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index 49b55ec65d..9daec8063f 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -134,6 +134,7 @@ def __init__( config: DummyClientConfiguration, capabilities: DestinationCapabilitiesContext, ) -> None: + SupportsStagingDestination.__init__(self) super().__init__(schema, config, capabilities) self.in_staging_context = False self.config: DummyClientConfiguration = config diff --git a/dlt/destinations/impl/redshift/redshift.py b/dlt/destinations/impl/redshift/redshift.py index 93827c8163..ef11c9dcd8 100644 --- a/dlt/destinations/impl/redshift/redshift.py +++ b/dlt/destinations/impl/redshift/redshift.py @@ -233,6 +233,9 @@ def __init__( config.credentials, capabilities, ) + SupportsStagingDestination.__init__( + self, config.truncate_table_before_load_on_staging_destination + ) super().__init__(schema, config, sql_client) self.sql_client = sql_client self.config: RedshiftClientConfiguration = config diff --git a/dlt/destinations/impl/snowflake/snowflake.py b/dlt/destinations/impl/snowflake/snowflake.py index 8b4eabc961..5d0f2df1fc 100644 --- a/dlt/destinations/impl/snowflake/snowflake.py +++ b/dlt/destinations/impl/snowflake/snowflake.py @@ -266,6 +266,9 @@ def __init__( capabilities, config.query_tag, ) + SupportsStagingDestination.__init__( + self, config.truncate_table_before_load_on_staging_destination + ) super().__init__(schema, config, sql_client) self.config: SnowflakeClientConfiguration = config self.sql_client: SnowflakeSqlClient = sql_client # type: ignore diff --git a/dlt/load/utils.py b/dlt/load/utils.py index 741c01f249..e3a2ebcd79 100644 --- a/dlt/load/utils.py +++ b/dlt/load/utils.py @@ -179,9 +179,10 @@ def _init_dataset_and_update_schema( applied_update = job_client.update_stored_schema( only_tables=update_tables, expected_update=expected_update ) - logger.info( - f"Client for {job_client.config.destination_type} will truncate tables {staging_text}" - ) + if truncate_tables: + logger.info( + f"Client for {job_client.config.destination_type} will truncate tables {staging_text}" + ) job_client.initialize_storage(truncate_tables=truncate_tables) return applied_update From 5dcdcf64bdceeba1bb5ba06300d7b040d5814c0a Mon Sep 17 00:00:00 2001 From: Violetta Mishechkina Date: Fri, 23 Aug 2024 20:34:43 +0200 Subject: [PATCH 2/5] Fix comments, add tests --- dlt/common/destination/reference.py | 4 +-- dlt/destinations/impl/athena/athena.py | 6 ++-- .../impl/clickhouse/clickhouse.py | 4 +-- .../impl/databricks/databricks.py | 4 +-- dlt/destinations/impl/dremio/dremio.py | 4 +-- dlt/destinations/impl/dummy/configuration.py | 2 ++ dlt/destinations/impl/dummy/dummy.py | 2 +- dlt/destinations/impl/redshift/redshift.py | 4 +-- dlt/destinations/impl/snowflake/snowflake.py | 4 +-- dlt/destinations/impl/synapse/synapse.py | 1 + docs/website/docs/dlt-ecosystem/staging.md | 29 ++++++++++++++++--- tests/load/test_dummy_client.py | 17 +++++++++++ 12 files changed, 55 insertions(+), 26 deletions(-) diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index cb562d4247..c0b4ea4ff3 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -583,9 +583,9 @@ def with_staging_dataset(self) -> ContextManager["JobClientBase"]: class SupportsStagingDestination: """Adds capability to support a staging destination for the load""" - def __init__(self, truncate_table_before_load_on_staging_destination: bool = True) -> None: + def __init__(self, config: DestinationClientDwhWithStagingConfiguration) -> None: self.truncate_table_before_load_on_staging_destination = ( - truncate_table_before_load_on_staging_destination + config.truncate_table_before_load_on_staging_destination ) def should_load_data_to_staging_dataset_on_staging_destination( diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index bf8849dd5b..89a5a89067 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -389,9 +389,7 @@ def __init__( config, capabilities, ) - SupportsStagingDestination.__init__( - self, config.truncate_table_before_load_on_staging_destination - ) + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.sql_client: AthenaSQLClient = sql_client # type: ignore self.config: AthenaClientConfiguration = config @@ -534,7 +532,7 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable if table["write_disposition"] == "replace" and not self._is_iceberg_table( self.prepare_load_table(table["name"]) ): - return True + return self.truncate_table_before_load_on_staging_destination return False def should_load_data_to_staging_dataset_on_staging_destination( diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 5bb23b5240..fba1a18c89 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -283,9 +283,7 @@ def __init__( capabilities, config, ) - SupportsStagingDestination.__init__( - self, config.truncate_table_before_load_on_staging_destination - ) + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, self.sql_client) self.config: ClickHouseClientConfiguration = config self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR) diff --git a/dlt/destinations/impl/databricks/databricks.py b/dlt/destinations/impl/databricks/databricks.py index 5c10f1f4cb..cab1f8a507 100644 --- a/dlt/destinations/impl/databricks/databricks.py +++ b/dlt/destinations/impl/databricks/databricks.py @@ -262,9 +262,7 @@ def __init__( config.credentials, capabilities, ) - SupportsStagingDestination.__init__( - self, config.truncate_table_before_load_on_staging_destination - ) + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.config: DatabricksClientConfiguration = config self.sql_client: DatabricksSqlClient = sql_client # type: ignore[assignment] diff --git a/dlt/destinations/impl/dremio/dremio.py b/dlt/destinations/impl/dremio/dremio.py index 9dcad4cfa9..8014450ba3 100644 --- a/dlt/destinations/impl/dremio/dremio.py +++ b/dlt/destinations/impl/dremio/dremio.py @@ -147,9 +147,7 @@ def __init__( config.credentials, capabilities, ) - SupportsStagingDestination.__init__( - self, config.truncate_table_before_load_on_staging_destination - ) + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.config: DremioClientConfiguration = config self.sql_client: DremioSqlClient = sql_client # type: ignore diff --git a/dlt/destinations/impl/dummy/configuration.py b/dlt/destinations/impl/dummy/configuration.py index 023b88e51a..cfc79af5ab 100644 --- a/dlt/destinations/impl/dummy/configuration.py +++ b/dlt/destinations/impl/dummy/configuration.py @@ -34,6 +34,8 @@ class DummyClientConfiguration(DestinationClientConfiguration): """raise terminal exception in job init""" fail_transiently_in_init: bool = False """raise transient exception in job init""" + truncate_table_before_load_on_staging_destination: bool = True + """truncate tables on staging destination""" # new jobs workflows create_followup_jobs: bool = False diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index 9daec8063f..bade1fd94f 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -134,7 +134,7 @@ def __init__( config: DummyClientConfiguration, capabilities: DestinationCapabilitiesContext, ) -> None: - SupportsStagingDestination.__init__(self) + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, capabilities) self.in_staging_context = False self.config: DummyClientConfiguration = config diff --git a/dlt/destinations/impl/redshift/redshift.py b/dlt/destinations/impl/redshift/redshift.py index ef11c9dcd8..243e2f4e1e 100644 --- a/dlt/destinations/impl/redshift/redshift.py +++ b/dlt/destinations/impl/redshift/redshift.py @@ -233,9 +233,7 @@ def __init__( config.credentials, capabilities, ) - SupportsStagingDestination.__init__( - self, config.truncate_table_before_load_on_staging_destination - ) + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.sql_client = sql_client self.config: RedshiftClientConfiguration = config diff --git a/dlt/destinations/impl/snowflake/snowflake.py b/dlt/destinations/impl/snowflake/snowflake.py index 5d0f2df1fc..e1bb8930ba 100644 --- a/dlt/destinations/impl/snowflake/snowflake.py +++ b/dlt/destinations/impl/snowflake/snowflake.py @@ -266,9 +266,7 @@ def __init__( capabilities, config.query_tag, ) - SupportsStagingDestination.__init__( - self, config.truncate_table_before_load_on_staging_destination - ) + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.config: SnowflakeClientConfiguration = config self.sql_client: SnowflakeSqlClient = sql_client # type: ignore diff --git a/dlt/destinations/impl/synapse/synapse.py b/dlt/destinations/impl/synapse/synapse.py index e43e2a6dfa..08ac7b9286 100644 --- a/dlt/destinations/impl/synapse/synapse.py +++ b/dlt/destinations/impl/synapse/synapse.py @@ -59,6 +59,7 @@ def __init__( config: SynapseClientConfiguration, capabilities: DestinationCapabilitiesContext, ) -> None: + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, capabilities) self.config: SynapseClientConfiguration = config self.sql_client = SynapseSqlClient( diff --git a/docs/website/docs/dlt-ecosystem/staging.md b/docs/website/docs/dlt-ecosystem/staging.md index 05e31a574b..cd2b19d32a 100644 --- a/docs/website/docs/dlt-ecosystem/staging.md +++ b/docs/website/docs/dlt-ecosystem/staging.md @@ -42,9 +42,13 @@ truncate_staging_dataset=true `dlt` allows to chain destinations where the first one (`staging`) is responsible for uploading the files from local filesystem to the remote storage. It then generates followup jobs for the second destination that (typically) copy the files from remote storage into destination. Currently, only one destination the [filesystem](destinations/filesystem.md) can be used as a staging. Following destinations can copy remote files: -1. [Redshift.](destinations/redshift.md#staging-support) -2. [Bigquery.](destinations/bigquery.md#staging-support) -3. [Snowflake.](destinations/snowflake.md#staging-support) + +1. [Azure Synapse](destinations/synapse#staging-support) +1. [Athena](destinations/athena#staging-support) +1. [Bigquery](destinations/bigquery.md#staging-support) +1. [Dremio](destinations/dremio#staging-support) +1. [Redshift](destinations/redshift.md#staging-support) +1. [Snowflake](destinations/snowflake.md#staging-support) ### How to use In essence, you need to set up two destinations and then pass them to `dlt.pipeline`. Below we'll use `filesystem` staging with `parquet` files to load into `Redshift` destination. @@ -96,4 +100,21 @@ In essence, you need to set up two destinations and then pass them to `dlt.pipel Run the pipeline script as usual. -> 💡 Please note that `dlt` does not delete loaded files from the staging storage after the load is complete. +:::tip +Please note that `dlt` does not delete loaded files from the staging storage after the load is complete, but it trunkate previously loaded files. +::: + +### How to prevent staging files truncation + +Before `dlt` loads data to staging storage it truncates previously loaded files. To prevent it and keep the whole history +of loaded files you can use the following parameter: + +```toml +[destination.redshift] +truncate_table_before_load_on_staging_destination=false +``` + +:::caution +[Athena](destinations/athena#staging-support) destination only truncate not iceberg tables with `replace` merge_disposition. +Therefore, parameter `truncate_table_before_load_on_staging_destination` only control truncation of corresponding files for these tables. +::: diff --git a/tests/load/test_dummy_client.py b/tests/load/test_dummy_client.py index 9f0bca6ac5..5555d80753 100644 --- a/tests/load/test_dummy_client.py +++ b/tests/load/test_dummy_client.py @@ -548,6 +548,23 @@ def test_completed_loop_with_delete_completed() -> None: assert_complete_job(load, should_delete_completed=True) +@pytest.mark.parametrize("to_truncate", [True, False]) +def test_truncate_table_before_load_on_stanging(to_truncate) -> None: + load = setup_loader( + client_config=DummyClientConfiguration( + truncate_table_before_load_on_staging_destination=to_truncate + ) + ) + load_id, schema = prepare_load_package(load.load_storage, NORMALIZED_FILES) + destination_client = load.get_destination_client(schema) + assert ( + destination_client.should_truncate_table_before_load_on_staging_destination( + schema.tables["_dlt_version"] + ) + == to_truncate + ) + + def test_retry_on_new_loop() -> None: # test job that retries sitting in new jobs load = setup_loader(client_config=DummyClientConfiguration(retry_prob=1.0)) From 6fd36ac72e07fc18da5523ccb66593c14bb8b43c Mon Sep 17 00:00:00 2001 From: Violetta Mishechkina Date: Mon, 26 Aug 2024 12:26:43 +0200 Subject: [PATCH 3/5] Fix tests --- dlt/common/destination/reference.py | 8 ++++---- dlt/destinations/impl/athena/athena.py | 2 +- dlt/destinations/impl/bigquery/bigquery.py | 4 +--- dlt/destinations/impl/dummy/configuration.py | 2 +- dlt/destinations/impl/dummy/dummy.py | 2 +- tests/load/test_dummy_client.py | 4 ++-- 6 files changed, 10 insertions(+), 12 deletions(-) diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index c0b4ea4ff3..3c17fcb7e2 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -269,7 +269,7 @@ class DestinationClientDwhWithStagingConfiguration(DestinationClientDwhConfigura staging_config: Optional[DestinationClientStagingConfiguration] = None """configuration of the staging, if present, injected at runtime""" - truncate_table_before_load_on_staging_destination: bool = True + truncate_tables_on_staging_destination_before_load: bool = True """If dlt should truncate the tables on staging destination before loading data.""" @@ -584,8 +584,8 @@ class SupportsStagingDestination: """Adds capability to support a staging destination for the load""" def __init__(self, config: DestinationClientDwhWithStagingConfiguration) -> None: - self.truncate_table_before_load_on_staging_destination = ( - config.truncate_table_before_load_on_staging_destination + self.truncate_tables_on_staging_destination_before_load = ( + config.truncate_tables_on_staging_destination_before_load ) def should_load_data_to_staging_dataset_on_staging_destination( @@ -595,7 +595,7 @@ def should_load_data_to_staging_dataset_on_staging_destination( def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: # the default is to truncate the tables on the staging destination... - return self.truncate_table_before_load_on_staging_destination + return self.truncate_tables_on_staging_destination_before_load # TODO: type Destination properly diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 89a5a89067..96d823097e 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -532,7 +532,7 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable if table["write_disposition"] == "replace" and not self._is_iceberg_table( self.prepare_load_table(table["name"]) ): - return self.truncate_table_before_load_on_staging_destination + return self.truncate_tables_on_staging_destination_before_load return False def should_load_data_to_staging_dataset_on_staging_destination( diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index 0a62ea586b..d96eec2d6f 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -229,9 +229,7 @@ def __init__( config.http_timeout, config.retry_deadline, ) - SupportsStagingDestination.__init__( - self, config.truncate_table_before_load_on_staging_destination - ) + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.config: BigQueryClientConfiguration = config self.sql_client: BigQuerySqlClient = sql_client # type: ignore diff --git a/dlt/destinations/impl/dummy/configuration.py b/dlt/destinations/impl/dummy/configuration.py index cfc79af5ab..a066479294 100644 --- a/dlt/destinations/impl/dummy/configuration.py +++ b/dlt/destinations/impl/dummy/configuration.py @@ -34,7 +34,7 @@ class DummyClientConfiguration(DestinationClientConfiguration): """raise terminal exception in job init""" fail_transiently_in_init: bool = False """raise transient exception in job init""" - truncate_table_before_load_on_staging_destination: bool = True + truncate_tables_on_staging_destination_before_load: bool = True """truncate tables on staging destination""" # new jobs workflows diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index bade1fd94f..c4fb3889a5 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -134,7 +134,7 @@ def __init__( config: DummyClientConfiguration, capabilities: DestinationCapabilitiesContext, ) -> None: - SupportsStagingDestination.__init__(self, config) + SupportsStagingDestination.__init__(self, config) # type: ignore super().__init__(schema, config, capabilities) self.in_staging_context = False self.config: DummyClientConfiguration = config diff --git a/tests/load/test_dummy_client.py b/tests/load/test_dummy_client.py index 5555d80753..59b7acac15 100644 --- a/tests/load/test_dummy_client.py +++ b/tests/load/test_dummy_client.py @@ -552,13 +552,13 @@ def test_completed_loop_with_delete_completed() -> None: def test_truncate_table_before_load_on_stanging(to_truncate) -> None: load = setup_loader( client_config=DummyClientConfiguration( - truncate_table_before_load_on_staging_destination=to_truncate + truncate_tables_on_staging_destination_before_load=to_truncate ) ) load_id, schema = prepare_load_package(load.load_storage, NORMALIZED_FILES) destination_client = load.get_destination_client(schema) assert ( - destination_client.should_truncate_table_before_load_on_staging_destination( + destination_client.should_truncate_table_before_load_on_staging_destination( # type: ignore schema.tables["_dlt_version"] ) == to_truncate From 46e41176c939a91ebe251c4d301660bda164e43b Mon Sep 17 00:00:00 2001 From: Violetta Mishechkina Date: Tue, 27 Aug 2024 16:09:27 +0200 Subject: [PATCH 4/5] Move implementation from mixing, add tests --- dlt/common/destination/reference.py | 11 +--- dlt/destinations/impl/athena/athena.py | 3 +- dlt/destinations/impl/bigquery/bigquery.py | 4 +- .../impl/clickhouse/clickhouse.py | 4 +- .../impl/databricks/databricks.py | 4 +- dlt/destinations/impl/dremio/dremio.py | 4 +- dlt/destinations/impl/dummy/dummy.py | 4 +- dlt/destinations/impl/redshift/redshift.py | 4 +- dlt/destinations/impl/snowflake/snowflake.py | 4 +- dlt/destinations/impl/synapse/synapse.py | 4 +- tests/load/pipeline/test_stage_loading.py | 57 ++++++++++++++++++- 11 files changed, 82 insertions(+), 21 deletions(-) diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 3c17fcb7e2..0944b03bea 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -580,22 +580,17 @@ def with_staging_dataset(self) -> ContextManager["JobClientBase"]: return self # type: ignore -class SupportsStagingDestination: +class SupportsStagingDestination(ABC): """Adds capability to support a staging destination for the load""" - def __init__(self, config: DestinationClientDwhWithStagingConfiguration) -> None: - self.truncate_tables_on_staging_destination_before_load = ( - config.truncate_tables_on_staging_destination_before_load - ) - def should_load_data_to_staging_dataset_on_staging_destination( self, table: TTableSchema ) -> bool: return False + @abstractmethod def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: - # the default is to truncate the tables on the staging destination... - return self.truncate_tables_on_staging_destination_before_load + pass # TODO: type Destination properly diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 96d823097e..b28309b930 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -389,7 +389,6 @@ def __init__( config, capabilities, ) - SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.sql_client: AthenaSQLClient = sql_client # type: ignore self.config: AthenaClientConfiguration = config @@ -532,7 +531,7 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable if table["write_disposition"] == "replace" and not self._is_iceberg_table( self.prepare_load_table(table["name"]) ): - return self.truncate_tables_on_staging_destination_before_load + return self.config.truncate_tables_on_staging_destination_before_load return False def should_load_data_to_staging_dataset_on_staging_destination( diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index d96eec2d6f..11326cf3ed 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -229,7 +229,6 @@ def __init__( config.http_timeout, config.retry_deadline, ) - SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.config: BigQueryClientConfiguration = config self.sql_client: BigQuerySqlClient = sql_client # type: ignore @@ -504,6 +503,9 @@ def _should_autodetect_schema(self, table_name: str) -> bool: self.schema._schema_tables, table_name, AUTODETECT_SCHEMA_HINT, allow_none=True ) or (self.config.autodetect_schema and table_name not in self.schema.dlt_table_names()) + def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: + return self.config.truncate_tables_on_staging_destination_before_load + def _streaming_load( items: List[Dict[Any, Any]], table: Dict[str, Any], job_client: BigQueryClient diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index fba1a18c89..282fbaf338 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -283,7 +283,6 @@ def __init__( capabilities, config, ) - SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, self.sql_client) self.config: ClickHouseClientConfiguration = config self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR) @@ -373,3 +372,6 @@ def _from_db_type( self, ch_t: str, precision: Optional[int], scale: Optional[int] ) -> TColumnType: return self.type_mapper.from_db_type(ch_t, precision, scale) + + def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: + return self.config.truncate_tables_on_staging_destination_before_load diff --git a/dlt/destinations/impl/databricks/databricks.py b/dlt/destinations/impl/databricks/databricks.py index cab1f8a507..38412b2608 100644 --- a/dlt/destinations/impl/databricks/databricks.py +++ b/dlt/destinations/impl/databricks/databricks.py @@ -262,7 +262,6 @@ def __init__( config.credentials, capabilities, ) - SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.config: DatabricksClientConfiguration = config self.sql_client: DatabricksSqlClient = sql_client # type: ignore[assignment] @@ -326,3 +325,6 @@ def _get_storage_table_query_columns(self) -> List[str]: "full_data_type" ) return fields + + def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: + return self.config.truncate_tables_on_staging_destination_before_load diff --git a/dlt/destinations/impl/dremio/dremio.py b/dlt/destinations/impl/dremio/dremio.py index 8014450ba3..149d106dcd 100644 --- a/dlt/destinations/impl/dremio/dremio.py +++ b/dlt/destinations/impl/dremio/dremio.py @@ -147,7 +147,6 @@ def __init__( config.credentials, capabilities, ) - SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.config: DremioClientConfiguration = config self.sql_client: DremioSqlClient = sql_client # type: ignore @@ -211,3 +210,6 @@ def _make_add_column_sql( self, new_columns: Sequence[TColumnSchema], table_format: TTableFormat = None ) -> List[str]: return ["ADD COLUMNS (" + ", ".join(self._get_column_def_sql(c) for c in new_columns) + ")"] + + def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: + return self.config.truncate_tables_on_staging_destination_before_load diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index c4fb3889a5..feb09369dc 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -134,7 +134,6 @@ def __init__( config: DummyClientConfiguration, capabilities: DestinationCapabilitiesContext, ) -> None: - SupportsStagingDestination.__init__(self, config) # type: ignore super().__init__(schema, config, capabilities) self.in_staging_context = False self.config: DummyClientConfiguration = config @@ -203,6 +202,9 @@ def complete_load(self, load_id: str) -> None: def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool: return super().should_load_data_to_staging_dataset(table) + def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: + return self.config.truncate_tables_on_staging_destination_before_load + @contextmanager def with_staging_dataset(self) -> Iterator[JobClientBase]: try: diff --git a/dlt/destinations/impl/redshift/redshift.py b/dlt/destinations/impl/redshift/redshift.py index 243e2f4e1e..0e201dc4e0 100644 --- a/dlt/destinations/impl/redshift/redshift.py +++ b/dlt/destinations/impl/redshift/redshift.py @@ -233,7 +233,6 @@ def __init__( config.credentials, capabilities, ) - SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.sql_client = sql_client self.config: RedshiftClientConfiguration = config @@ -275,3 +274,6 @@ def _from_db_type( self, pq_t: str, precision: Optional[int], scale: Optional[int] ) -> TColumnType: return self.type_mapper.from_db_type(pq_t, precision, scale) + + def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: + return self.config.truncate_tables_on_staging_destination_before_load diff --git a/dlt/destinations/impl/snowflake/snowflake.py b/dlt/destinations/impl/snowflake/snowflake.py index e1bb8930ba..6688b5bc17 100644 --- a/dlt/destinations/impl/snowflake/snowflake.py +++ b/dlt/destinations/impl/snowflake/snowflake.py @@ -266,7 +266,6 @@ def __init__( capabilities, config.query_tag, ) - SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.config: SnowflakeClientConfiguration = config self.sql_client: SnowflakeSqlClient = sql_client # type: ignore @@ -326,3 +325,6 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non return ( f"{name} {self.type_mapper.to_db_type(c)} {self._gen_not_null(c.get('nullable', True))}" ) + + def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: + return self.config.truncate_tables_on_staging_destination_before_load diff --git a/dlt/destinations/impl/synapse/synapse.py b/dlt/destinations/impl/synapse/synapse.py index 08ac7b9286..750a4895f0 100644 --- a/dlt/destinations/impl/synapse/synapse.py +++ b/dlt/destinations/impl/synapse/synapse.py @@ -59,7 +59,6 @@ def __init__( config: SynapseClientConfiguration, capabilities: DestinationCapabilitiesContext, ) -> None: - SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, capabilities) self.config: SynapseClientConfiguration = config self.sql_client = SynapseSqlClient( @@ -174,6 +173,9 @@ def create_load_job( ) return job + def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: + return self.config.truncate_tables_on_staging_destination_before_load + class SynapseCopyFileLoadJob(CopyRemoteFileLoadJob): def __init__( diff --git a/tests/load/pipeline/test_stage_loading.py b/tests/load/pipeline/test_stage_loading.py index a760c86526..f216fa3c05 100644 --- a/tests/load/pipeline/test_stage_loading.py +++ b/tests/load/pipeline/test_stage_loading.py @@ -1,12 +1,12 @@ import pytest -from typing import Dict, Any, List +from typing import List import dlt, os -from dlt.common import json, sleep -from copy import deepcopy +from dlt.common import json from dlt.common.storages.configuration import FilesystemConfiguration from dlt.common.utils import uniq_id from dlt.common.schema.typing import TDataType +from dlt.destinations.impl.filesystem.filesystem import FilesystemClient from tests.load.pipeline.test_merge_disposition import github from tests.pipeline.utils import load_table_counts, assert_load_info @@ -40,6 +40,13 @@ def load_modified_issues(): yield from issues +@dlt.resource(table_name="events", write_disposition="append", primary_key="timestamp") +def event_many_load_2(): + with open("tests/normalize/cases/event.event.many_load_2.json", "r", encoding="utf-8") as f: + events = json.load(f) + yield from events + + @pytest.mark.parametrize( "destination_config", destinations_configs(all_staging_configs=True), ids=lambda x: x.name ) @@ -183,6 +190,50 @@ def test_staging_load(destination_config: DestinationTestConfiguration) -> None: assert replace_counts == initial_counts +@pytest.mark.parametrize( + "destination_config", destinations_configs(all_staging_configs=True), ids=lambda x: x.name +) +def test_truncate_staging_dataset(destination_config: DestinationTestConfiguration) -> None: + """This test checks if tables truncation on staging destination done according to the configuration. + + Test loads data to the destination three times: + * with truncation + * without truncation (after this 2 staging files should be left) + * with truncation (after this 1 staging file should be left) + """ + pipeline = destination_config.setup_pipeline( + pipeline_name="test_stage_loading", dataset_name="test_staging_load" + uniq_id() + ) + resource = event_many_load_2() + table_name: str = resource.table_name # type: ignore[assignment] + + # load the data, files stay on the stage after the load + info = pipeline.run(resource) + assert_load_info(info) + + # load the data without truncating of the staging, should see two files on staging + pipeline.destination.config_params["truncate_tables_on_staging_destination_before_load"] = False + info = pipeline.run(resource) + assert_load_info(info) + # check there are two staging files + _, staging_client = pipeline._get_destination_clients(pipeline.default_schema) + with staging_client: + assert len(staging_client.list_table_files(table_name)) == 2 # type: ignore[attr-defined] + + # load the data with truncating, so only new file is on the staging + pipeline.destination.config_params["truncate_tables_on_staging_destination_before_load"] = True + info = pipeline.run(resource) + assert_load_info(info) + # check that table exists in the destination + with pipeline.sql_client() as sql_client: + qual_name = sql_client.make_qualified_table_name + assert len(sql_client.execute_sql(f"SELECT * from {qual_name(table_name)}")) > 4 + # check there is only one staging file + _, staging_client = pipeline._get_destination_clients(pipeline.default_schema) + with staging_client: + assert len(staging_client.list_table_files(table_name)) == 1 # type: ignore[attr-defined] + + @pytest.mark.parametrize( "destination_config", destinations_configs(all_staging_configs=True), ids=lambda x: x.name ) From 6a2bdd27e0818d79e887cd8b08b6ea0aa53f8c6e Mon Sep 17 00:00:00 2001 From: Violetta Mishechkina Date: Wed, 28 Aug 2024 10:51:46 +0200 Subject: [PATCH 5/5] Fix docs grammar --- docs/website/docs/dlt-ecosystem/staging.md | 53 ++++++++++------------ 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/docs/website/docs/dlt-ecosystem/staging.md b/docs/website/docs/dlt-ecosystem/staging.md index cd2b19d32a..789189b7dd 100644 --- a/docs/website/docs/dlt-ecosystem/staging.md +++ b/docs/website/docs/dlt-ecosystem/staging.md @@ -1,36 +1,33 @@ --- title: Staging -description: Configure an s3 or gcs bucket for staging before copying into the destination +description: Configure an S3 or GCS bucket for staging before copying into the destination keywords: [staging, destination] --- # Staging -The goal of staging is to bring the data closer to the database engine so the modification of the destination (final) dataset happens faster and without errors. `dlt`, when asked, creates two -staging areas: +The goal of staging is to bring the data closer to the database engine so that the modification of the destination (final) dataset happens faster and without errors. `dlt`, when asked, creates two staging areas: 1. A **staging dataset** used by the [merge and replace loads](../general-usage/incremental-loading.md#merge-incremental_loading) to deduplicate and merge data with the destination. -2. A **staging storage** which is typically a s3/gcp bucket where [loader files](file-formats/) are copied before they are loaded by the destination. +2. A **staging storage** which is typically an S3/GCP bucket where [loader files](file-formats/) are copied before they are loaded by the destination. ## Staging dataset -`dlt` creates a staging dataset when write disposition of any of the loaded resources requires it. It creates and migrates required tables exactly like for the -main dataset. Data in staging tables is truncated when load step begins and only for tables that will participate in it. -Such staging dataset has the same name as the dataset passed to `dlt.pipeline` but with `_staging` suffix in the name. Alternatively, you can provide your own staging dataset pattern or use a fixed name, identical for all the -configured datasets. +`dlt` creates a staging dataset when the write disposition of any of the loaded resources requires it. It creates and migrates required tables exactly like for the main dataset. Data in staging tables is truncated when the load step begins and only for tables that will participate in it. +Such a staging dataset has the same name as the dataset passed to `dlt.pipeline` but with a `_staging` suffix in the name. Alternatively, you can provide your own staging dataset pattern or use a fixed name, identical for all the configured datasets. ```toml [destination.postgres] staging_dataset_name_layout="staging_%s" ``` -Entry above switches the pattern to `staging_` prefix and for example for dataset with name **github_data** `dlt` will create **staging_github_data**. +The entry above switches the pattern to `staging_` prefix and for example, for a dataset with the name **github_data**, `dlt` will create **staging_github_data**. -To configure static staging dataset name, you can do the following (we use destination factory) +To configure a static staging dataset name, you can do the following (we use the destination factory) ```py import dlt dest_ = dlt.destinations.postgres(staging_dataset_name_layout="_dlt_staging") ``` -All pipelines using `dest_` as destination will use **staging_dataset** to store staging tables. Make sure that your pipelines are not overwriting each other's tables. +All pipelines using `dest_` as the destination will use the **staging_dataset** to store staging tables. Make sure that your pipelines are not overwriting each other's tables. -### Cleanup up staging dataset automatically -`dlt` does not truncate tables in staging dataset at the end of the load. Data that is left after contains all the extracted data and may be useful for debugging. +### Cleanup staging dataset automatically +`dlt` does not truncate tables in the staging dataset at the end of the load. Data that is left after contains all the extracted data and may be useful for debugging. If you prefer to truncate it, put the following line in `config.toml`: ```toml @@ -39,9 +36,9 @@ truncate_staging_dataset=true ``` ## Staging storage -`dlt` allows to chain destinations where the first one (`staging`) is responsible for uploading the files from local filesystem to the remote storage. It then generates followup jobs for the second destination that (typically) copy the files from remote storage into destination. +`dlt` allows chaining destinations where the first one (`staging`) is responsible for uploading the files from the local filesystem to the remote storage. It then generates follow-up jobs for the second destination that (typically) copy the files from remote storage into the destination. -Currently, only one destination the [filesystem](destinations/filesystem.md) can be used as a staging. Following destinations can copy remote files: +Currently, only one destination, the [filesystem](destinations/filesystem.md), can be used as staging. The following destinations can copy remote files: 1. [Azure Synapse](destinations/synapse#staging-support) 1. [Athena](destinations/athena#staging-support) @@ -51,11 +48,11 @@ Currently, only one destination the [filesystem](destinations/filesystem.md) can 1. [Snowflake](destinations/snowflake.md#staging-support) ### How to use -In essence, you need to set up two destinations and then pass them to `dlt.pipeline`. Below we'll use `filesystem` staging with `parquet` files to load into `Redshift` destination. +In essence, you need to set up two destinations and then pass them to `dlt.pipeline`. Below we'll use `filesystem` staging with `parquet` files to load into the `Redshift` destination. -1. **Set up the s3 bucket and filesystem staging.** +1. **Set up the S3 bucket and filesystem staging.** - Please follow our guide in [filesystem destination documentation](destinations/filesystem.md). Test the staging as standalone destination to make sure that files go where you want them. In your `secrets.toml` you should now have a working `filesystem` configuration: + Please follow our guide in the [filesystem destination documentation](destinations/filesystem.md). Test the staging as a standalone destination to make sure that files go where you want them. In your `secrets.toml`, you should now have a working `filesystem` configuration: ```toml [destination.filesystem] bucket_url = "s3://[your_bucket_name]" # replace with your bucket name, @@ -67,15 +64,15 @@ In essence, you need to set up two destinations and then pass them to `dlt.pipel 2. **Set up the Redshift destination.** - Please follow our guide in [redshift destination documentation](destinations/redshift.md). In your `secrets.toml` you added: + Please follow our guide in the [redshift destination documentation](destinations/redshift.md). In your `secrets.toml`, you added: ```toml # keep it at the top of your toml file! before any section starts destination.redshift.credentials="redshift://loader:@localhost/dlt_data?connect_timeout=15" ``` -3. **Authorize Redshift cluster to access the staging bucket.** +3. **Authorize the Redshift cluster to access the staging bucket.** - By default `dlt` will forward the credentials configured for `filesystem` to the `Redshift` COPY command. If you are fine with this, move to the next step. + By default, `dlt` will forward the credentials configured for `filesystem` to the `Redshift` COPY command. If you are fine with this, move to the next step. 4. **Chain staging to destination and request `parquet` file format.** @@ -83,7 +80,7 @@ In essence, you need to set up two destinations and then pass them to `dlt.pipel ```py # Create a dlt pipeline that will load # chess player data to the redshift destination - # via staging on s3 + # via staging on S3 pipeline = dlt.pipeline( pipeline_name='chess_pipeline', destination='redshift', @@ -91,7 +88,7 @@ In essence, you need to set up two destinations and then pass them to `dlt.pipel dataset_name='player_data' ) ``` - `dlt` will automatically select an appropriate loader file format for the staging files. Below we explicitly specify `parquet` file format (just to demonstrate how to do it): + `dlt` will automatically select an appropriate loader file format for the staging files. Below we explicitly specify the `parquet` file format (just to demonstrate how to do it): ```py info = pipeline.run(chess(), loader_file_format="parquet") ``` @@ -101,13 +98,13 @@ In essence, you need to set up two destinations and then pass them to `dlt.pipel Run the pipeline script as usual. :::tip -Please note that `dlt` does not delete loaded files from the staging storage after the load is complete, but it trunkate previously loaded files. +Please note that `dlt` does not delete loaded files from the staging storage after the load is complete, but it truncates previously loaded files. ::: ### How to prevent staging files truncation -Before `dlt` loads data to staging storage it truncates previously loaded files. To prevent it and keep the whole history -of loaded files you can use the following parameter: +Before `dlt` loads data to the staging storage, it truncates previously loaded files. To prevent it and keep the whole history +of loaded files, you can use the following parameter: ```toml [destination.redshift] @@ -115,6 +112,6 @@ truncate_table_before_load_on_staging_destination=false ``` :::caution -[Athena](destinations/athena#staging-support) destination only truncate not iceberg tables with `replace` merge_disposition. -Therefore, parameter `truncate_table_before_load_on_staging_destination` only control truncation of corresponding files for these tables. +The [Athena](destinations/athena#staging-support) destination only truncates not iceberg tables with `replace` merge_disposition. +Therefore, the parameter `truncate_table_before_load_on_staging_destination` only controls the truncation of corresponding files for these tables. :::