Skip to content

Commit

Permalink
runs staging tests on athena (#1764)
Browse files Browse the repository at this point in the history
* always truncates staging tables on athena + replace without iceberg

* adds athena staging configs to all staging configs

* updates athena tests for staging destination
  • Loading branch information
rudolfix authored Aug 28, 2024
1 parent 63f8954 commit b48c7c3
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 23 deletions.
11 changes: 11 additions & 0 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,10 +586,21 @@ class SupportsStagingDestination(ABC):
def should_load_data_to_staging_dataset_on_staging_destination(
self, table: TTableSchema
) -> bool:
"""If set to True, and staging destination is configured, the data will be loaded to staging dataset on staging destination
instead of a regular dataset on staging destination. Currently it is used by Athena Iceberg which uses staging dataset
on staging destination to copy data to iceberg tables stored on regular dataset on staging destination.
The default is to load data to regular dataset on staging destination from where warehouses like Snowflake (that have their
own storage) will copy data.
"""
return False

@abstractmethod
def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool:
"""If set to True, data in `table` will be truncated on staging destination (regular dataset). This is the default behavior which
can be changed with a config flag.
For Athena + Iceberg this setting is always False - Athena uses regular dataset to store Iceberg tables and we avoid touching it.
For Athena we truncate those tables only on "replace" write disposition.
"""
pass


Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable
if table["write_disposition"] == "replace" and not self._is_iceberg_table(
self.prepare_load_table(table["name"])
):
return self.config.truncate_tables_on_staging_destination_before_load
return True
return False

def should_load_data_to_staging_dataset_on_staging_destination(
Expand Down
23 changes: 21 additions & 2 deletions tests/load/pipeline/test_stage_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,18 @@ def test_truncate_staging_dataset(destination_config: DestinationTestConfigurati
# check there are two staging files
_, staging_client = pipeline._get_destination_clients(pipeline.default_schema)
with staging_client:
assert len(staging_client.list_table_files(table_name)) == 2 # type: ignore[attr-defined]
# except Athena + Iceberg which does not store tables in staging dataset
if (
destination_config.destination == "athena"
and destination_config.table_format == "iceberg"
):
table_count = 0
# but keeps them in staging dataset on staging destination - but only the last one
with staging_client.with_staging_dataset(): # type: ignore[attr-defined]
assert len(staging_client.list_table_files(table_name)) == 1 # type: ignore[attr-defined]
else:
table_count = 2
assert len(staging_client.list_table_files(table_name)) == table_count # type: ignore[attr-defined]

# load the data with truncating, so only new file is on the staging
pipeline.destination.config_params["truncate_tables_on_staging_destination_before_load"] = True
Expand All @@ -231,7 +242,15 @@ def test_truncate_staging_dataset(destination_config: DestinationTestConfigurati
# check there is only one staging file
_, staging_client = pipeline._get_destination_clients(pipeline.default_schema)
with staging_client:
assert len(staging_client.list_table_files(table_name)) == 1 # type: ignore[attr-defined]
# except for Athena which does not delete staging destination tables
if destination_config.destination == "athena":
if destination_config.table_format == "iceberg":
table_count = 0
else:
table_count = 3
else:
table_count = 1
assert len(staging_client.list_table_files(table_name)) == table_count # type: ignore[attr-defined]


@pytest.mark.parametrize(
Expand Down
49 changes: 29 additions & 20 deletions tests/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,27 @@ def destinations_configs(
# build destination configs
destination_configs: List[DestinationTestConfiguration] = []

# default sql configs that are also default staging configs
default_sql_configs_with_staging = [
# Athena needs filesystem staging, which will be automatically set; we have to supply a bucket url though.
DestinationTestConfiguration(
destination="athena",
file_format="parquet",
supports_merge=False,
bucket_url=AWS_BUCKET,
),
DestinationTestConfiguration(
destination="athena",
file_format="parquet",
bucket_url=AWS_BUCKET,
force_iceberg=True,
supports_merge=True,
supports_dbt=False,
table_format="iceberg",
extra_info="iceberg",
),
]

# default non staging sql based configs, one per destination
if default_sql_configs:
destination_configs += [
Expand All @@ -268,26 +289,10 @@ def destinations_configs(
DestinationTestConfiguration(destination="duckdb", file_format="parquet"),
DestinationTestConfiguration(destination="motherduck", file_format="insert_values"),
]
# Athena needs filesystem staging, which will be automatically set; we have to supply a bucket url though.
destination_configs += [
DestinationTestConfiguration(
destination="athena",
file_format="parquet",
supports_merge=False,
bucket_url=AWS_BUCKET,
)
]
destination_configs += [
DestinationTestConfiguration(
destination="athena",
file_format="parquet",
bucket_url=AWS_BUCKET,
force_iceberg=True,
supports_merge=True,
supports_dbt=False,
extra_info="iceberg",
)
]

# add Athena staging configs
destination_configs += default_sql_configs_with_staging

destination_configs += [
DestinationTestConfiguration(
destination="clickhouse", file_format="jsonl", supports_dbt=False
Expand Down Expand Up @@ -332,6 +337,10 @@ def destinations_configs(
DestinationTestConfiguration(destination="qdrant", extra_info="server"),
]

if (default_sql_configs or all_staging_configs) and not default_sql_configs:
# athena default configs not added yet
destination_configs += default_sql_configs_with_staging

if default_staging_configs or all_staging_configs:
destination_configs += [
DestinationTestConfiguration(
Expand Down

0 comments on commit b48c7c3

Please sign in to comment.