Skip to content

Commit

Permalink
Expose staging tables truncation to config (#1717)
Browse files Browse the repository at this point in the history
* Expose staging tables truncation to config

* Fix comments, add tests

* Fix tests

* Move implementation from mixing, add tests

* Fix docs grammar
  • Loading branch information
VioletM authored Aug 28, 2024
1 parent 817d51d commit 98ca505
Show file tree
Hide file tree
Showing 15 changed files with 152 additions and 37 deletions.
8 changes: 5 additions & 3 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ class DestinationClientDwhWithStagingConfiguration(DestinationClientDwhConfigura

staging_config: Optional[DestinationClientStagingConfiguration] = None
"""configuration of the staging, if present, injected at runtime"""
truncate_tables_on_staging_destination_before_load: bool = True
"""If dlt should truncate the tables on staging destination before loading data."""


TLoadJobState = Literal["ready", "running", "failed", "retry", "completed"]
Expand Down Expand Up @@ -578,17 +580,17 @@ def with_staging_dataset(self) -> ContextManager["JobClientBase"]:
return self # type: ignore


class SupportsStagingDestination:
class SupportsStagingDestination(ABC):
"""Adds capability to support a staging destination for the load"""

def should_load_data_to_staging_dataset_on_staging_destination(
self, table: TTableSchema
) -> bool:
return False

@abstractmethod
def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool:
# the default is to truncate the tables on the staging destination...
return True
pass


# TODO: type Destination properly
Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable
if table["write_disposition"] == "replace" and not self._is_iceberg_table(
self.prepare_load_table(table["name"])
):
return True
return self.config.truncate_tables_on_staging_destination_before_load
return False

def should_load_data_to_staging_dataset_on_staging_destination(
Expand Down
3 changes: 3 additions & 0 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,9 @@ def _should_autodetect_schema(self, table_name: str) -> bool:
self.schema._schema_tables, table_name, AUTODETECT_SCHEMA_HINT, allow_none=True
) or (self.config.autodetect_schema and table_name not in self.schema.dlt_table_names())

def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool:
return self.config.truncate_tables_on_staging_destination_before_load


def _streaming_load(
items: List[Dict[Any, Any]], table: Dict[str, Any], job_client: BigQueryClient
Expand Down
3 changes: 3 additions & 0 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,3 +372,6 @@ def _from_db_type(
self, ch_t: str, precision: Optional[int], scale: Optional[int]
) -> TColumnType:
return self.type_mapper.from_db_type(ch_t, precision, scale)

def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool:
return self.config.truncate_tables_on_staging_destination_before_load
3 changes: 3 additions & 0 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,6 @@ def _get_storage_table_query_columns(self) -> List[str]:
"full_data_type"
)
return fields

def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool:
return self.config.truncate_tables_on_staging_destination_before_load
3 changes: 3 additions & 0 deletions dlt/destinations/impl/dremio/dremio.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,6 @@ def _make_add_column_sql(
self, new_columns: Sequence[TColumnSchema], table_format: TTableFormat = None
) -> List[str]:
return ["ADD COLUMNS (" + ", ".join(self._get_column_def_sql(c) for c in new_columns) + ")"]

def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool:
return self.config.truncate_tables_on_staging_destination_before_load
2 changes: 2 additions & 0 deletions dlt/destinations/impl/dummy/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class DummyClientConfiguration(DestinationClientConfiguration):
"""raise terminal exception in job init"""
fail_transiently_in_init: bool = False
"""raise transient exception in job init"""
truncate_tables_on_staging_destination_before_load: bool = True
"""truncate tables on staging destination"""

# new jobs workflows
create_followup_jobs: bool = False
Expand Down
3 changes: 3 additions & 0 deletions dlt/destinations/impl/dummy/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ def complete_load(self, load_id: str) -> None:
def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool:
return super().should_load_data_to_staging_dataset(table)

def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool:
return self.config.truncate_tables_on_staging_destination_before_load

@contextmanager
def with_staging_dataset(self) -> Iterator[JobClientBase]:
try:
Expand Down
3 changes: 3 additions & 0 deletions dlt/destinations/impl/redshift/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,6 @@ def _from_db_type(
self, pq_t: str, precision: Optional[int], scale: Optional[int]
) -> TColumnType:
return self.type_mapper.from_db_type(pq_t, precision, scale)

def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool:
return self.config.truncate_tables_on_staging_destination_before_load
3 changes: 3 additions & 0 deletions dlt/destinations/impl/snowflake/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,6 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non
return (
f"{name} {self.type_mapper.to_db_type(c)} {self._gen_not_null(c.get('nullable', True))}"
)

def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool:
return self.config.truncate_tables_on_staging_destination_before_load
3 changes: 3 additions & 0 deletions dlt/destinations/impl/synapse/synapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ def create_load_job(
)
return job

def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool:
return self.config.truncate_tables_on_staging_destination_before_load


class SynapseCopyFileLoadJob(CopyRemoteFileLoadJob):
def __init__(
Expand Down
7 changes: 4 additions & 3 deletions dlt/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,10 @@ def _init_dataset_and_update_schema(
applied_update = job_client.update_stored_schema(
only_tables=update_tables, expected_update=expected_update
)
logger.info(
f"Client for {job_client.config.destination_type} will truncate tables {staging_text}"
)
if truncate_tables:
logger.info(
f"Client for {job_client.config.destination_type} will truncate tables {staging_text}"
)

job_client.initialize_storage(truncate_tables=truncate_tables)
return applied_update
Expand Down
72 changes: 45 additions & 27 deletions docs/website/docs/dlt-ecosystem/staging.md
Original file line number Diff line number Diff line change
@@ -1,36 +1,33 @@
---
title: Staging
description: Configure an s3 or gcs bucket for staging before copying into the destination
description: Configure an S3 or GCS bucket for staging before copying into the destination
keywords: [staging, destination]
---
# Staging

The goal of staging is to bring the data closer to the database engine so the modification of the destination (final) dataset happens faster and without errors. `dlt`, when asked, creates two
staging areas:
The goal of staging is to bring the data closer to the database engine so that the modification of the destination (final) dataset happens faster and without errors. `dlt`, when asked, creates two staging areas:
1. A **staging dataset** used by the [merge and replace loads](../general-usage/incremental-loading.md#merge-incremental_loading) to deduplicate and merge data with the destination.
2. A **staging storage** which is typically a s3/gcp bucket where [loader files](file-formats/) are copied before they are loaded by the destination.
2. A **staging storage** which is typically an S3/GCP bucket where [loader files](file-formats/) are copied before they are loaded by the destination.

## Staging dataset
`dlt` creates a staging dataset when write disposition of any of the loaded resources requires it. It creates and migrates required tables exactly like for the
main dataset. Data in staging tables is truncated when load step begins and only for tables that will participate in it.
Such staging dataset has the same name as the dataset passed to `dlt.pipeline` but with `_staging` suffix in the name. Alternatively, you can provide your own staging dataset pattern or use a fixed name, identical for all the
configured datasets.
`dlt` creates a staging dataset when the write disposition of any of the loaded resources requires it. It creates and migrates required tables exactly like for the main dataset. Data in staging tables is truncated when the load step begins and only for tables that will participate in it.
Such a staging dataset has the same name as the dataset passed to `dlt.pipeline` but with a `_staging` suffix in the name. Alternatively, you can provide your own staging dataset pattern or use a fixed name, identical for all the configured datasets.
```toml
[destination.postgres]
staging_dataset_name_layout="staging_%s"
```
Entry above switches the pattern to `staging_` prefix and for example for dataset with name **github_data** `dlt` will create **staging_github_data**.
The entry above switches the pattern to `staging_` prefix and for example, for a dataset with the name **github_data**, `dlt` will create **staging_github_data**.

To configure static staging dataset name, you can do the following (we use destination factory)
To configure a static staging dataset name, you can do the following (we use the destination factory)
```py
import dlt

dest_ = dlt.destinations.postgres(staging_dataset_name_layout="_dlt_staging")
```
All pipelines using `dest_` as destination will use **staging_dataset** to store staging tables. Make sure that your pipelines are not overwriting each other's tables.
All pipelines using `dest_` as the destination will use the **staging_dataset** to store staging tables. Make sure that your pipelines are not overwriting each other's tables.

### Cleanup up staging dataset automatically
`dlt` does not truncate tables in staging dataset at the end of the load. Data that is left after contains all the extracted data and may be useful for debugging.
### Cleanup staging dataset automatically
`dlt` does not truncate tables in the staging dataset at the end of the load. Data that is left after contains all the extracted data and may be useful for debugging.
If you prefer to truncate it, put the following line in `config.toml`:

```toml
Expand All @@ -39,19 +36,23 @@ truncate_staging_dataset=true
```

## Staging storage
`dlt` allows to chain destinations where the first one (`staging`) is responsible for uploading the files from local filesystem to the remote storage. It then generates followup jobs for the second destination that (typically) copy the files from remote storage into destination.
`dlt` allows chaining destinations where the first one (`staging`) is responsible for uploading the files from the local filesystem to the remote storage. It then generates follow-up jobs for the second destination that (typically) copy the files from remote storage into the destination.

Currently, only one destination the [filesystem](destinations/filesystem.md) can be used as a staging. Following destinations can copy remote files:
1. [Redshift.](destinations/redshift.md#staging-support)
2. [Bigquery.](destinations/bigquery.md#staging-support)
3. [Snowflake.](destinations/snowflake.md#staging-support)
Currently, only one destination, the [filesystem](destinations/filesystem.md), can be used as staging. The following destinations can copy remote files:

1. [Azure Synapse](destinations/synapse#staging-support)
1. [Athena](destinations/athena#staging-support)
1. [Bigquery](destinations/bigquery.md#staging-support)
1. [Dremio](destinations/dremio#staging-support)
1. [Redshift](destinations/redshift.md#staging-support)
1. [Snowflake](destinations/snowflake.md#staging-support)

### How to use
In essence, you need to set up two destinations and then pass them to `dlt.pipeline`. Below we'll use `filesystem` staging with `parquet` files to load into `Redshift` destination.
In essence, you need to set up two destinations and then pass them to `dlt.pipeline`. Below we'll use `filesystem` staging with `parquet` files to load into the `Redshift` destination.

1. **Set up the s3 bucket and filesystem staging.**
1. **Set up the S3 bucket and filesystem staging.**

Please follow our guide in [filesystem destination documentation](destinations/filesystem.md). Test the staging as standalone destination to make sure that files go where you want them. In your `secrets.toml` you should now have a working `filesystem` configuration:
Please follow our guide in the [filesystem destination documentation](destinations/filesystem.md). Test the staging as a standalone destination to make sure that files go where you want them. In your `secrets.toml`, you should now have a working `filesystem` configuration:
```toml
[destination.filesystem]
bucket_url = "s3://[your_bucket_name]" # replace with your bucket name,
Expand All @@ -63,31 +64,31 @@ In essence, you need to set up two destinations and then pass them to `dlt.pipel

2. **Set up the Redshift destination.**

Please follow our guide in [redshift destination documentation](destinations/redshift.md). In your `secrets.toml` you added:
Please follow our guide in the [redshift destination documentation](destinations/redshift.md). In your `secrets.toml`, you added:
```toml
# keep it at the top of your toml file! before any section starts
destination.redshift.credentials="redshift://loader:<password>@localhost/dlt_data?connect_timeout=15"
```

3. **Authorize Redshift cluster to access the staging bucket.**
3. **Authorize the Redshift cluster to access the staging bucket.**

By default `dlt` will forward the credentials configured for `filesystem` to the `Redshift` COPY command. If you are fine with this, move to the next step.
By default, `dlt` will forward the credentials configured for `filesystem` to the `Redshift` COPY command. If you are fine with this, move to the next step.

4. **Chain staging to destination and request `parquet` file format.**

Pass the `staging` argument to `dlt.pipeline`. It works like the destination `argument`:
```py
# Create a dlt pipeline that will load
# chess player data to the redshift destination
# via staging on s3
# via staging on S3
pipeline = dlt.pipeline(
pipeline_name='chess_pipeline',
destination='redshift',
staging='filesystem', # add this to activate the staging location
dataset_name='player_data'
)
```
`dlt` will automatically select an appropriate loader file format for the staging files. Below we explicitly specify `parquet` file format (just to demonstrate how to do it):
`dlt` will automatically select an appropriate loader file format for the staging files. Below we explicitly specify the `parquet` file format (just to demonstrate how to do it):
```py
info = pipeline.run(chess(), loader_file_format="parquet")
```
Expand All @@ -96,4 +97,21 @@ In essence, you need to set up two destinations and then pass them to `dlt.pipel

Run the pipeline script as usual.

> 💡 Please note that `dlt` does not delete loaded files from the staging storage after the load is complete.
:::tip
Please note that `dlt` does not delete loaded files from the staging storage after the load is complete, but it truncates previously loaded files.
:::

### How to prevent staging files truncation

Before `dlt` loads data to the staging storage, it truncates previously loaded files. To prevent it and keep the whole history
of loaded files, you can use the following parameter:

```toml
[destination.redshift]
truncate_table_before_load_on_staging_destination=false
```

:::caution
The [Athena](destinations/athena#staging-support) destination only truncates not iceberg tables with `replace` merge_disposition.
Therefore, the parameter `truncate_table_before_load_on_staging_destination` only controls the truncation of corresponding files for these tables.
:::
57 changes: 54 additions & 3 deletions tests/load/pipeline/test_stage_loading.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import pytest
from typing import Dict, Any, List
from typing import List

import dlt, os
from dlt.common import json, sleep
from copy import deepcopy
from dlt.common import json
from dlt.common.storages.configuration import FilesystemConfiguration
from dlt.common.utils import uniq_id
from dlt.common.schema.typing import TDataType
from dlt.destinations.impl.filesystem.filesystem import FilesystemClient

from tests.load.pipeline.test_merge_disposition import github
from tests.pipeline.utils import load_table_counts, assert_load_info
Expand Down Expand Up @@ -40,6 +40,13 @@ def load_modified_issues():
yield from issues


@dlt.resource(table_name="events", write_disposition="append", primary_key="timestamp")
def event_many_load_2():
with open("tests/normalize/cases/event.event.many_load_2.json", "r", encoding="utf-8") as f:
events = json.load(f)
yield from events


@pytest.mark.parametrize(
"destination_config", destinations_configs(all_staging_configs=True), ids=lambda x: x.name
)
Expand Down Expand Up @@ -183,6 +190,50 @@ def test_staging_load(destination_config: DestinationTestConfiguration) -> None:
assert replace_counts == initial_counts


@pytest.mark.parametrize(
"destination_config", destinations_configs(all_staging_configs=True), ids=lambda x: x.name
)
def test_truncate_staging_dataset(destination_config: DestinationTestConfiguration) -> None:
"""This test checks if tables truncation on staging destination done according to the configuration.
Test loads data to the destination three times:
* with truncation
* without truncation (after this 2 staging files should be left)
* with truncation (after this 1 staging file should be left)
"""
pipeline = destination_config.setup_pipeline(
pipeline_name="test_stage_loading", dataset_name="test_staging_load" + uniq_id()
)
resource = event_many_load_2()
table_name: str = resource.table_name # type: ignore[assignment]

# load the data, files stay on the stage after the load
info = pipeline.run(resource)
assert_load_info(info)

# load the data without truncating of the staging, should see two files on staging
pipeline.destination.config_params["truncate_tables_on_staging_destination_before_load"] = False
info = pipeline.run(resource)
assert_load_info(info)
# check there are two staging files
_, staging_client = pipeline._get_destination_clients(pipeline.default_schema)
with staging_client:
assert len(staging_client.list_table_files(table_name)) == 2 # type: ignore[attr-defined]

# load the data with truncating, so only new file is on the staging
pipeline.destination.config_params["truncate_tables_on_staging_destination_before_load"] = True
info = pipeline.run(resource)
assert_load_info(info)
# check that table exists in the destination
with pipeline.sql_client() as sql_client:
qual_name = sql_client.make_qualified_table_name
assert len(sql_client.execute_sql(f"SELECT * from {qual_name(table_name)}")) > 4
# check there is only one staging file
_, staging_client = pipeline._get_destination_clients(pipeline.default_schema)
with staging_client:
assert len(staging_client.list_table_files(table_name)) == 1 # type: ignore[attr-defined]


@pytest.mark.parametrize(
"destination_config", destinations_configs(all_staging_configs=True), ids=lambda x: x.name
)
Expand Down
17 changes: 17 additions & 0 deletions tests/load/test_dummy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,23 @@ def test_completed_loop_with_delete_completed() -> None:
assert_complete_job(load, should_delete_completed=True)


@pytest.mark.parametrize("to_truncate", [True, False])
def test_truncate_table_before_load_on_stanging(to_truncate) -> None:
load = setup_loader(
client_config=DummyClientConfiguration(
truncate_tables_on_staging_destination_before_load=to_truncate
)
)
load_id, schema = prepare_load_package(load.load_storage, NORMALIZED_FILES)
destination_client = load.get_destination_client(schema)
assert (
destination_client.should_truncate_table_before_load_on_staging_destination( # type: ignore
schema.tables["_dlt_version"]
)
== to_truncate
)


def test_retry_on_new_loop() -> None:
# test job that retries sitting in new jobs
load = setup_loader(client_config=DummyClientConfiguration(retry_prob=1.0))
Expand Down

0 comments on commit 98ca505

Please sign in to comment.