diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index 1b8b980fdd..9e5ab59060 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -269,7 +269,7 @@ class DestinationClientDwhWithStagingConfiguration(DestinationClientDwhConfigura staging_config: Optional[DestinationClientStagingConfiguration] = None """configuration of the staging, if present, injected at runtime""" - truncate_table_before_load_on_staging_destination: bool = True + truncate_tables_on_staging_destination_before_load: bool = True """If dlt should truncate the tables on staging destination before loading data.""" @@ -580,8 +580,8 @@ class SupportsStagingDestination: """Adds capability to support a staging destination for the load""" def __init__(self, config: DestinationClientDwhWithStagingConfiguration) -> None: - self.truncate_table_before_load_on_staging_destination = ( - config.truncate_table_before_load_on_staging_destination + self.truncate_tables_on_staging_destination_before_load = ( + config.truncate_tables_on_staging_destination_before_load ) def should_load_data_to_staging_dataset_on_staging_destination( @@ -591,7 +591,7 @@ def should_load_data_to_staging_dataset_on_staging_destination( def should_truncate_table_before_load_on_staging_destination(self, table: TTableSchema) -> bool: # the default is to truncate the tables on the staging destination... - return self.truncate_table_before_load_on_staging_destination + return self.truncate_tables_on_staging_destination_before_load # TODO: type Destination properly diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 733809ab1f..a8205296e7 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -532,7 +532,7 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable if table["write_disposition"] == "replace" and not self._is_iceberg_table( self.prepare_load_table(table["name"]) ): - return self.truncate_table_before_load_on_staging_destination + return self.truncate_tables_on_staging_destination_before_load return False def should_load_data_to_staging_dataset_on_staging_destination( diff --git a/dlt/destinations/impl/bigquery/bigquery.py b/dlt/destinations/impl/bigquery/bigquery.py index 0a62ea586b..d96eec2d6f 100644 --- a/dlt/destinations/impl/bigquery/bigquery.py +++ b/dlt/destinations/impl/bigquery/bigquery.py @@ -229,9 +229,7 @@ def __init__( config.http_timeout, config.retry_deadline, ) - SupportsStagingDestination.__init__( - self, config.truncate_table_before_load_on_staging_destination - ) + SupportsStagingDestination.__init__(self, config) super().__init__(schema, config, sql_client) self.config: BigQueryClientConfiguration = config self.sql_client: BigQuerySqlClient = sql_client # type: ignore diff --git a/dlt/destinations/impl/dummy/configuration.py b/dlt/destinations/impl/dummy/configuration.py index cfc79af5ab..a066479294 100644 --- a/dlt/destinations/impl/dummy/configuration.py +++ b/dlt/destinations/impl/dummy/configuration.py @@ -34,7 +34,7 @@ class DummyClientConfiguration(DestinationClientConfiguration): """raise terminal exception in job init""" fail_transiently_in_init: bool = False """raise transient exception in job init""" - truncate_table_before_load_on_staging_destination: bool = True + truncate_tables_on_staging_destination_before_load: bool = True """truncate tables on staging destination""" # new jobs workflows diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index bade1fd94f..c4fb3889a5 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -134,7 +134,7 @@ def __init__( config: DummyClientConfiguration, capabilities: DestinationCapabilitiesContext, ) -> None: - SupportsStagingDestination.__init__(self, config) + SupportsStagingDestination.__init__(self, config) # type: ignore super().__init__(schema, config, capabilities) self.in_staging_context = False self.config: DummyClientConfiguration = config diff --git a/docs/examples/parent_child_relationship/parent_child_relationship.py b/docs/examples/parent_child_relationship/parent_child_relationship.py index 39c9f577cc..6de00ffb28 100644 --- a/docs/examples/parent_child_relationship/parent_child_relationship.py +++ b/docs/examples/parent_child_relationship/parent_child_relationship.py @@ -22,6 +22,7 @@ from typing import List, Dict, Any, Generator import dlt + # Define a dlt resource with write disposition to 'merge' @dlt.resource(name="parent_with_children", write_disposition={"disposition": "merge"}) def data_source() -> Generator[List[Dict[str, Any]], None, None]: @@ -44,6 +45,7 @@ def data_source() -> Generator[List[Dict[str, Any]], None, None]: yield data + # Function to add parent_id to each child record within a parent record def add_parent_id(record: Dict[str, Any]) -> Dict[str, Any]: parent_id_key = "parent_id" @@ -51,6 +53,7 @@ def add_parent_id(record: Dict[str, Any]) -> Dict[str, Any]: child[parent_id_key] = record[parent_id_key] return record + if __name__ == "__main__": # Create and configure the dlt pipeline pipeline = dlt.pipeline( @@ -60,10 +63,6 @@ def add_parent_id(record: Dict[str, Any]) -> Dict[str, Any]: ) # Run the pipeline - load_info = pipeline.run( - data_source() - .add_map(add_parent_id), - primary_key="parent_id" - ) + load_info = pipeline.run(data_source().add_map(add_parent_id), primary_key="parent_id") # Output the load information after pipeline execution print(load_info) diff --git a/docs/examples/parent_child_relationship/test_parent_child_relationship.py b/docs/examples/parent_child_relationship/test_parent_child_relationship.py index f671040823..95d1bade97 100644 --- a/docs/examples/parent_child_relationship/test_parent_child_relationship.py +++ b/docs/examples/parent_child_relationship/test_parent_child_relationship.py @@ -1,4 +1,3 @@ - import pytest from tests.utils import skipifgithubfork @@ -29,6 +28,7 @@ from typing import List, Dict, Any, Generator import dlt + # Define a dlt resource with write disposition to 'merge' @dlt.resource(name="parent_with_children", write_disposition={"disposition": "merge"}) def data_source() -> Generator[List[Dict[str, Any]], None, None]: @@ -51,6 +51,7 @@ def data_source() -> Generator[List[Dict[str, Any]], None, None]: yield data + # Function to add parent_id to each child record within a parent record def add_parent_id(record: Dict[str, Any]) -> Dict[str, Any]: parent_id_key = "parent_id" @@ -58,6 +59,7 @@ def add_parent_id(record: Dict[str, Any]) -> Dict[str, Any]: child[parent_id_key] = record[parent_id_key] return record + @skipifgithubfork @pytest.mark.forked def test_parent_child_relationship(): @@ -69,10 +71,6 @@ def test_parent_child_relationship(): ) # Run the pipeline - load_info = pipeline.run( - data_source() - .add_map(add_parent_id), - primary_key="parent_id" - ) + load_info = pipeline.run(data_source().add_map(add_parent_id), primary_key="parent_id") # Output the load information after pipeline execution print(load_info) diff --git a/tests/load/test_dummy_client.py b/tests/load/test_dummy_client.py index 5555d80753..59b7acac15 100644 --- a/tests/load/test_dummy_client.py +++ b/tests/load/test_dummy_client.py @@ -552,13 +552,13 @@ def test_completed_loop_with_delete_completed() -> None: def test_truncate_table_before_load_on_stanging(to_truncate) -> None: load = setup_loader( client_config=DummyClientConfiguration( - truncate_table_before_load_on_staging_destination=to_truncate + truncate_tables_on_staging_destination_before_load=to_truncate ) ) load_id, schema = prepare_load_package(load.load_storage, NORMALIZED_FILES) destination_client = load.get_destination_client(schema) assert ( - destination_client.should_truncate_table_before_load_on_staging_destination( + destination_client.should_truncate_table_before_load_on_staging_destination( # type: ignore schema.tables["_dlt_version"] ) == to_truncate