Skip to content

Commit

Permalink
Fix comments, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
VioletM committed Aug 23, 2024
1 parent d7d1566 commit 82b1a06
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 26 deletions.
4 changes: 2 additions & 2 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,9 @@ def with_staging_dataset(self) -> ContextManager["JobClientBase"]:
class SupportsStagingDestination:
"""Adds capability to support a staging destination for the load"""

def __init__(self, truncate_table_before_load_on_staging_destination: bool = True) -> None:
def __init__(self, config: DestinationClientDwhWithStagingConfiguration) -> None:
self.truncate_table_before_load_on_staging_destination = (
truncate_table_before_load_on_staging_destination
config.truncate_table_before_load_on_staging_destination
)

def should_load_data_to_staging_dataset_on_staging_destination(
Expand Down
6 changes: 2 additions & 4 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,7 @@ def __init__(
config,
capabilities,
)
SupportsStagingDestination.__init__(
self, config.truncate_table_before_load_on_staging_destination
)
SupportsStagingDestination.__init__(self, config)
super().__init__(schema, config, sql_client)
self.sql_client: AthenaSQLClient = sql_client # type: ignore
self.config: AthenaClientConfiguration = config
Expand Down Expand Up @@ -532,7 +530,7 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable
if table["write_disposition"] == "replace" and not self._is_iceberg_table(
self.prepare_load_table(table["name"])
):
return True
return self.truncate_table_before_load_on_staging_destination
return False

def should_load_data_to_staging_dataset_on_staging_destination(
Expand Down
4 changes: 1 addition & 3 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,7 @@ def __init__(
capabilities,
config,
)
SupportsStagingDestination.__init__(
self, config.truncate_table_before_load_on_staging_destination
)
SupportsStagingDestination.__init__(self, config)
super().__init__(schema, config, self.sql_client)
self.config: ClickHouseClientConfiguration = config
self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR)
Expand Down
4 changes: 1 addition & 3 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,7 @@ def __init__(
config.credentials,
capabilities,
)
SupportsStagingDestination.__init__(
self, config.truncate_table_before_load_on_staging_destination
)
SupportsStagingDestination.__init__(self, config)
super().__init__(schema, config, sql_client)
self.config: DatabricksClientConfiguration = config
self.sql_client: DatabricksSqlClient = sql_client # type: ignore[assignment]
Expand Down
4 changes: 1 addition & 3 deletions dlt/destinations/impl/dremio/dremio.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,7 @@ def __init__(
config.credentials,
capabilities,
)
SupportsStagingDestination.__init__(
self, config.truncate_table_before_load_on_staging_destination
)
SupportsStagingDestination.__init__(self, config)
super().__init__(schema, config, sql_client)
self.config: DremioClientConfiguration = config
self.sql_client: DremioSqlClient = sql_client # type: ignore
Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/dummy/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class DummyClientConfiguration(DestinationClientConfiguration):
"""raise terminal exception in job init"""
fail_transiently_in_init: bool = False
"""raise transient exception in job init"""
truncate_table_before_load_on_staging_destination: bool = True
"""truncate tables on staging destination"""

# new jobs workflows
create_followup_jobs: bool = False
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/dummy/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def __init__(
config: DummyClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
SupportsStagingDestination.__init__(self)
SupportsStagingDestination.__init__(self, config)
super().__init__(schema, config, capabilities)
self.in_staging_context = False
self.config: DummyClientConfiguration = config
Expand Down
4 changes: 1 addition & 3 deletions dlt/destinations/impl/redshift/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,7 @@ def __init__(
config.credentials,
capabilities,
)
SupportsStagingDestination.__init__(
self, config.truncate_table_before_load_on_staging_destination
)
SupportsStagingDestination.__init__(self, config)
super().__init__(schema, config, sql_client)
self.sql_client = sql_client
self.config: RedshiftClientConfiguration = config
Expand Down
4 changes: 1 addition & 3 deletions dlt/destinations/impl/snowflake/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,7 @@ def __init__(
capabilities,
config.query_tag,
)
SupportsStagingDestination.__init__(
self, config.truncate_table_before_load_on_staging_destination
)
SupportsStagingDestination.__init__(self, config)
super().__init__(schema, config, sql_client)
self.config: SnowflakeClientConfiguration = config
self.sql_client: SnowflakeSqlClient = sql_client # type: ignore
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/synapse/synapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
config: SynapseClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
SupportsStagingDestination.__init__(self, config)
super().__init__(schema, config, capabilities)
self.config: SynapseClientConfiguration = config
self.sql_client = SynapseSqlClient(
Expand Down
29 changes: 25 additions & 4 deletions docs/website/docs/dlt-ecosystem/staging.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@ truncate_staging_dataset=true
`dlt` allows to chain destinations where the first one (`staging`) is responsible for uploading the files from local filesystem to the remote storage. It then generates followup jobs for the second destination that (typically) copy the files from remote storage into destination.

Currently, only one destination the [filesystem](destinations/filesystem.md) can be used as a staging. Following destinations can copy remote files:
1. [Redshift.](destinations/redshift.md#staging-support)
2. [Bigquery.](destinations/bigquery.md#staging-support)
3. [Snowflake.](destinations/snowflake.md#staging-support)

1. [Azure Synapse](destinations/synapse#staging-support)
1. [Athena](destinations/athena#staging-support)
1. [Bigquery](destinations/bigquery.md#staging-support)
1. [Dremio](destinations/dremio#staging-support)
1. [Redshift](destinations/redshift.md#staging-support)
1. [Snowflake](destinations/snowflake.md#staging-support)

### How to use
In essence, you need to set up two destinations and then pass them to `dlt.pipeline`. Below we'll use `filesystem` staging with `parquet` files to load into `Redshift` destination.
Expand Down Expand Up @@ -96,4 +100,21 @@ In essence, you need to set up two destinations and then pass them to `dlt.pipel

Run the pipeline script as usual.

> 💡 Please note that `dlt` does not delete loaded files from the staging storage after the load is complete.
:::tip
Please note that `dlt` does not delete loaded files from the staging storage after the load is complete, but it trunkate previously loaded files.
:::

### How to prevent staging files truncation

Before `dlt` loads data to staging storage it truncates previously loaded files. To prevent it and keep the whole history
of loaded files you can use the following parameter:

```toml
[destination.redshift]
truncate_table_before_load_on_staging_destination=false
```

:::caution
[Athena](destinations/athena#staging-support) destination only truncate not iceberg tables with `replace` merge_disposition.
Therefore, parameter `truncate_table_before_load_on_staging_destination` only control truncation of corresponding files for these tables.
:::
17 changes: 17 additions & 0 deletions tests/load/test_dummy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,23 @@ def test_completed_loop_with_delete_completed() -> None:
assert_complete_job(load, should_delete_completed=True)


@pytest.mark.parametrize("to_truncate", [True, False])
def test_truncate_table_before_load_on_stanging(to_truncate) -> None:
load = setup_loader(
client_config=DummyClientConfiguration(
truncate_table_before_load_on_staging_destination=to_truncate
)
)
load_id, schema = prepare_load_package(load.load_storage, NORMALIZED_FILES)
destination_client = load.get_destination_client(schema)
assert (
destination_client.should_truncate_table_before_load_on_staging_destination(
schema.tables["_dlt_version"]
)
== to_truncate
)


def test_retry_on_new_loop() -> None:
# test job that retries sitting in new jobs
load = setup_loader(client_config=DummyClientConfiguration(retry_prob=1.0))
Expand Down

0 comments on commit 82b1a06

Please sign in to comment.