From 82b1a063fee48dfa8f780e6c61f7030fb3c6b285 Mon Sep 17 00:00:00 2001 From: Violetta Mishechkina Date: Fri, 23 Aug 2024 20:34:43 +0200 Subject: [PATCH] Fix comments, add tests --- dlt/common/destination/reference.py | 4 +-- dlt/destinations/impl/athena/athena.py | 6 ++-- .../impl/clickhouse/clickhouse.py | 4 +-- .../impl/databricks/databricks.py | 4 +-- dlt/destinations/impl/dremio/dremio.py | 4 +-- dlt/destinations/impl/dummy/configuration.py | 2 ++ dlt/destinations/impl/dummy/dummy.py | 2 +- dlt/destinations/impl/redshift/redshift.py | 4 +-- dlt/destinations/impl/snowflake/snowflake.py | 4 +-- dlt/destinations/impl/synapse/synapse.py | 1 + docs/website/docs/dlt-ecosystem/staging.md | 29 ++++++++++++++++--- tests/load/test_dummy_client.py | 17 +++++++++++ 12 files changed, 55 insertions(+), 26 deletions(-) diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index cb652e96a4..5dca4f9a38 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -562,9 +562,9 @@ def with_staging_dataset(self) -> ContextManager["JobClientBase"]: class SupportsStagingDestination: """Adds capability to support a staging destination for the load""" - def __init__(self, truncate_table_before_load_on_staging_destination: bool = True) -> None: + def __init__(self, config: DestinationClientDwhWithStagingConfiguration) -> None: self.truncate_table_before_load_on_staging_destination = ( - truncate_table_before_load_on_staging_destination + config.truncate_table_before_load_on_staging_destination ) def should_load_data_to_staging_dataset_on_staging_destination( diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index e0770d05a5..7e136e9e53 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -389,9 +389,7 @@ def __init__( config, capabilities, ) - SupportsStagingDestination.__init__( - self, config.truncate_table_before_load_on_staging_destination - ) + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.sql_client: AthenaSQLClient = sql_client # type: ignore self.config: AthenaClientConfiguration = config @@ -532,7 +530,7 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable if table["write_disposition"] == "replace" and not self._is_iceberg_table( self.prepare_load_table(table["name"]) ): - return True + return self.truncate_table_before_load_on_staging_destination return False def should_load_data_to_staging_dataset_on_staging_destination( diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index e8d15b9ab5..51a8fe897c 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -283,9 +283,7 @@ def __init__( capabilities, config, ) - SupportsStagingDestination.__init__( - self, config.truncate_table_before_load_on_staging_destination - ) + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, self.sql_client) self.config: ClickHouseClientConfiguration = config self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR) diff --git a/dlt/destinations/impl/databricks/databricks.py b/dlt/destinations/impl/databricks/databricks.py index 21c9f7aea6..4a228fa23e 100644 --- a/dlt/destinations/impl/databricks/databricks.py +++ b/dlt/destinations/impl/databricks/databricks.py @@ -262,9 +262,7 @@ def __init__( config.credentials, capabilities, ) - SupportsStagingDestination.__init__( - self, config.truncate_table_before_load_on_staging_destination - ) + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.config: DatabricksClientConfiguration = config self.sql_client: DatabricksSqlClient = sql_client # type: ignore[assignment] diff --git a/dlt/destinations/impl/dremio/dremio.py b/dlt/destinations/impl/dremio/dremio.py index 72c90709f8..65538e481c 100644 --- a/dlt/destinations/impl/dremio/dremio.py +++ b/dlt/destinations/impl/dremio/dremio.py @@ -147,9 +147,7 @@ def __init__( config.credentials, capabilities, ) - SupportsStagingDestination.__init__( - self, config.truncate_table_before_load_on_staging_destination - ) + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.config: DremioClientConfiguration = config self.sql_client: DremioSqlClient = sql_client # type: ignore diff --git a/dlt/destinations/impl/dummy/configuration.py b/dlt/destinations/impl/dummy/configuration.py index 7bc1d9e943..afd18202c1 100644 --- a/dlt/destinations/impl/dummy/configuration.py +++ b/dlt/destinations/impl/dummy/configuration.py @@ -34,6 +34,8 @@ class DummyClientConfiguration(DestinationClientConfiguration): """raise terminal exception in job init""" fail_transiently_in_init: bool = False """raise transient exception in job init""" + truncate_table_before_load_on_staging_destination: bool = True + """truncate tables on staging destination""" # new jobs workflows create_followup_jobs: bool = False diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index f060cef0e4..a7ba84e380 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -127,7 +127,7 @@ def __init__( config: DummyClientConfiguration, capabilities: DestinationCapabilitiesContext, ) -> None: - SupportsStagingDestination.__init__(self) + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, capabilities) self.in_staging_context = False self.config: DummyClientConfiguration = config diff --git a/dlt/destinations/impl/redshift/redshift.py b/dlt/destinations/impl/redshift/redshift.py index c8deeb6d67..d86a94854a 100644 --- a/dlt/destinations/impl/redshift/redshift.py +++ b/dlt/destinations/impl/redshift/redshift.py @@ -233,9 +233,7 @@ def __init__( config.credentials, capabilities, ) - SupportsStagingDestination.__init__( - self, config.truncate_table_before_load_on_staging_destination - ) + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.sql_client = sql_client self.config: RedshiftClientConfiguration = config diff --git a/dlt/destinations/impl/snowflake/snowflake.py b/dlt/destinations/impl/snowflake/snowflake.py index 66e4473ef0..e8607b91b0 100644 --- a/dlt/destinations/impl/snowflake/snowflake.py +++ b/dlt/destinations/impl/snowflake/snowflake.py @@ -266,9 +266,7 @@ def __init__( capabilities, config.query_tag, ) - SupportsStagingDestination.__init__( - self, config.truncate_table_before_load_on_staging_destination - ) + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.config: SnowflakeClientConfiguration = config self.sql_client: SnowflakeSqlClient = sql_client # type: ignore diff --git a/dlt/destinations/impl/synapse/synapse.py b/dlt/destinations/impl/synapse/synapse.py index d1b38f73bd..7ba34cd50d 100644 --- a/dlt/destinations/impl/synapse/synapse.py +++ b/dlt/destinations/impl/synapse/synapse.py @@ -59,6 +59,7 @@ def __init__( config: SynapseClientConfiguration, capabilities: DestinationCapabilitiesContext, ) -> None: + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, capabilities) self.config: SynapseClientConfiguration = config self.sql_client = SynapseSqlClient( diff --git a/docs/website/docs/dlt-ecosystem/staging.md b/docs/website/docs/dlt-ecosystem/staging.md index 05e31a574b..cd2b19d32a 100644 --- a/docs/website/docs/dlt-ecosystem/staging.md +++ b/docs/website/docs/dlt-ecosystem/staging.md @@ -42,9 +42,13 @@ truncate_staging_dataset=true `dlt` allows to chain destinations where the first one (`staging`) is responsible for uploading the files from local filesystem to the remote storage. It then generates followup jobs for the second destination that (typically) copy the files from remote storage into destination. Currently, only one destination the [filesystem](destinations/filesystem.md) can be used as a staging. Following destinations can copy remote files: -1. [Redshift.](destinations/redshift.md#staging-support) -2. [Bigquery.](destinations/bigquery.md#staging-support) -3. [Snowflake.](destinations/snowflake.md#staging-support) + +1. [Azure Synapse](destinations/synapse#staging-support) +1. [Athena](destinations/athena#staging-support) +1. [Bigquery](destinations/bigquery.md#staging-support) +1. [Dremio](destinations/dremio#staging-support) +1. [Redshift](destinations/redshift.md#staging-support) +1. [Snowflake](destinations/snowflake.md#staging-support) ### How to use In essence, you need to set up two destinations and then pass them to `dlt.pipeline`. Below we'll use `filesystem` staging with `parquet` files to load into `Redshift` destination. @@ -96,4 +100,21 @@ In essence, you need to set up two destinations and then pass them to `dlt.pipel Run the pipeline script as usual. -> 💡 Please note that `dlt` does not delete loaded files from the staging storage after the load is complete. +:::tip +Please note that `dlt` does not delete loaded files from the staging storage after the load is complete, but it trunkate previously loaded files. +::: + +### How to prevent staging files truncation + +Before `dlt` loads data to staging storage it truncates previously loaded files. To prevent it and keep the whole history +of loaded files you can use the following parameter: + +```toml +[destination.redshift] +truncate_table_before_load_on_staging_destination=false +``` + +:::caution +[Athena](destinations/athena#staging-support) destination only truncate not iceberg tables with `replace` merge_disposition. +Therefore, parameter `truncate_table_before_load_on_staging_destination` only control truncation of corresponding files for these tables. +::: diff --git a/tests/load/test_dummy_client.py b/tests/load/test_dummy_client.py index b55f4ceece..8d22c6a9bd 100644 --- a/tests/load/test_dummy_client.py +++ b/tests/load/test_dummy_client.py @@ -512,6 +512,23 @@ def test_completed_loop_with_delete_completed() -> None: assert_complete_job(load, should_delete_completed=True) +@pytest.mark.parametrize("to_truncate", [True, False]) +def test_truncate_table_before_load_on_stanging(to_truncate) -> None: + load = setup_loader( + client_config=DummyClientConfiguration( + truncate_table_before_load_on_staging_destination=to_truncate + ) + ) + load_id, schema = prepare_load_package(load.load_storage, NORMALIZED_FILES) + destination_client = load.get_destination_client(schema) + assert ( + destination_client.should_truncate_table_before_load_on_staging_destination( + schema.tables["_dlt_version"] + ) + == to_truncate + ) + + def test_retry_on_new_loop() -> None: # test job that retries sitting in new jobs load = setup_loader(client_config=DummyClientConfiguration(retry_prob=1.0))