From 952d39a49ab87da1e3ffdea8e1ed227246d002aa Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sat, 24 Aug 2024 17:25:16 +0200 Subject: [PATCH 1/6] uses normalized column names when linking tables in relational --- dlt/common/normalizers/json/relational.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index 8e296445eb..1dbcec4bff 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -184,11 +184,10 @@ def _get_child_row_hash(parent_row_id: str, child_table: str, list_idx: int) -> # and all child tables must be lists return digest128(f"{parent_row_id}_{child_table}_{list_idx}", DLT_ID_LENGTH_BYTES) - @staticmethod - def _link_row(row: DictStrAny, parent_row_id: str, list_idx: int) -> DictStrAny: + def _link_row(self, row: DictStrAny, parent_row_id: str, list_idx: int) -> DictStrAny: assert parent_row_id - row["_dlt_parent_id"] = parent_row_id - row["_dlt_list_idx"] = list_idx + row[self.c_dlt_parent_id] = parent_row_id + row[self.c_dlt_list_idx] = list_idx return row @@ -227,7 +226,7 @@ def _add_row_id( if row_id_type == "row_hash": row_id = DataItemNormalizer._get_child_row_hash(parent_row_id, table, pos) # link to parent table - DataItemNormalizer._link_row(flattened_row, parent_row_id, pos) + self._link_row(flattened_row, parent_row_id, pos) flattened_row[self.c_dlt_id] = row_id return row_id @@ -260,7 +259,6 @@ def _normalize_list( parent_row_id: Optional[str] = None, _r_lvl: int = 0, ) -> TNormalizedRowIterator: - v: DictStrAny = None table = self.schema.naming.shorten_fragments(*parent_path, *ident_path) for idx, v in enumerate(seq): @@ -285,7 +283,7 @@ def _normalize_list( child_row_hash = DataItemNormalizer._get_child_row_hash(parent_row_id, table, idx) wrap_v = wrap_in_dict(v) wrap_v[self.c_dlt_id] = child_row_hash - e = DataItemNormalizer._link_row(wrap_v, parent_row_id, idx) + e = self._link_row(wrap_v, parent_row_id, idx) DataItemNormalizer._extend_row(extend, e) yield (table, self.schema.naming.shorten_fragments(*parent_path)), e From dda715898423103b6e3c5510b0d41428b3ce03ee Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sat, 24 Aug 2024 17:25:58 +0200 Subject: [PATCH 2/6] destination cap if create table if not exits supported --- dlt/common/destination/capabilities.py | 1 + dlt/destinations/impl/mssql/factory.py | 1 + dlt/destinations/impl/synapse/factory.py | 2 ++ 3 files changed, 4 insertions(+) diff --git a/dlt/common/destination/capabilities.py b/dlt/common/destination/capabilities.py index be71cb50e9..e9374c6618 100644 --- a/dlt/common/destination/capabilities.py +++ b/dlt/common/destination/capabilities.py @@ -76,6 +76,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext): # use naming convention in the schema naming_convention: TNamingConventionReferenceArg = None alter_add_multi_column: bool = True + create_table_not_exists: bool = True supports_truncate_command: bool = True schema_supports_numeric_precision: bool = True timestamp_precision: int = 6 diff --git a/dlt/destinations/impl/mssql/factory.py b/dlt/destinations/impl/mssql/factory.py index 85c94c21b7..4c03f1f64a 100644 --- a/dlt/destinations/impl/mssql/factory.py +++ b/dlt/destinations/impl/mssql/factory.py @@ -37,6 +37,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.max_text_data_type_length = 2**30 - 1 caps.is_max_text_data_type_length_in_bytes = False caps.supports_ddl_transactions = True + caps.create_table_not_exists = False # IF NOT EXISTS not supported caps.max_rows_per_insert = 1000 caps.timestamp_precision = 7 caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"] diff --git a/dlt/destinations/impl/synapse/factory.py b/dlt/destinations/impl/synapse/factory.py index bb117e48d2..d980dcac55 100644 --- a/dlt/destinations/impl/synapse/factory.py +++ b/dlt/destinations/impl/synapse/factory.py @@ -63,6 +63,8 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.supports_transactions = True caps.supports_ddl_transactions = False + caps.create_table_not_exists = False # IF NOT EXISTS on CREATE TABLE not supported + # Synapse throws "Some part of your SQL statement is nested too deeply. Rewrite the query or break it up into smaller queries." # if number of records exceeds a certain number. Which exact number that is seems not deterministic: # in tests, I've seen a query with 12230 records run succesfully on one run, but fail on a subsequent run, while the query remained exactly the same. From 74046e25db7ae44646d2a5cfe3a532a2a267585c Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sat, 24 Aug 2024 17:26:17 +0200 Subject: [PATCH 3/6] generates IF NOT EXISTS for dlt tables --- dlt/destinations/impl/athena/athena.py | 2 +- dlt/destinations/job_client_impl.py | 19 ++++++++++++++----- .../test_athena_table_builder.py | 0 tests/load/mssql/test_mssql_table_builder.py | 12 ++++++++++-- .../postgres/test_postgres_table_builder.py | 11 ++++++++++- 5 files changed, 35 insertions(+), 9 deletions(-) create mode 100644 tests/load/athena_iceberg/test_athena_table_builder.py diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 1429b28240..0c90d171a3 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -452,7 +452,7 @@ def _get_table_update_sql( partition_clause = self._iceberg_partition_clause( cast(Optional[Dict[str, str]], table.get(PARTITION_HINT)) ) - sql.append(f"""CREATE TABLE {qualified_table_name} + sql.append(f"""{self._make_create_table(qualified_table_name, table)} ({columns}) {partition_clause} LOCATION '{location.rstrip('/')}' diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index 92132dd751..5095394cbd 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -522,22 +522,31 @@ def _make_add_column_sql( """Make one or more ADD COLUMN sql clauses to be joined in ALTER TABLE statement(s)""" return [f"ADD COLUMN {self._get_column_def_sql(c, table_format)}" for c in new_columns] + def _make_create_table(self, qualified_name: str, table: TTableSchema) -> str: + not_exists_clause = " " + if ( + table["name"] in self.schema.dlt_table_names() + and self.capabilities.create_table_not_exists + ): + not_exists_clause = " IF NOT EXISTS " + return f"CREATE TABLE{not_exists_clause}{qualified_name}" + def _get_table_update_sql( self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool ) -> List[str]: # build sql - canonical_name = self.sql_client.make_qualified_table_name(table_name) + qualified_name = self.sql_client.make_qualified_table_name(table_name) table = self.prepare_load_table(table_name) table_format = table.get("table_format") sql_result: List[str] = [] if not generate_alter: # build CREATE - sql = f"CREATE TABLE {canonical_name} (\n" + sql = self._make_create_table(qualified_name, table) + " (\n" sql += ",\n".join([self._get_column_def_sql(c, table_format) for c in new_columns]) sql += ")" sql_result.append(sql) else: - sql_base = f"ALTER TABLE {canonical_name}\n" + sql_base = f"ALTER TABLE {qualified_name}\n" add_column_statements = self._make_add_column_sql(new_columns, table_format) if self.capabilities.alter_add_multi_column: column_sql = ",\n" @@ -561,13 +570,13 @@ def _get_table_update_sql( if hint == "not_null": logger.warning( f"Column(s) {hint_columns} with NOT NULL are being added to existing" - f" table {canonical_name}. If there's data in the table the operation" + f" table {qualified_name}. If there's data in the table the operation" " will fail." ) else: logger.warning( f"Column(s) {hint_columns} with hint {hint} are being added to existing" - f" table {canonical_name}. Several hint types may not be added to" + f" table {qualified_name}. Several hint types may not be added to" " existing tables." ) return sql_result diff --git a/tests/load/athena_iceberg/test_athena_table_builder.py b/tests/load/athena_iceberg/test_athena_table_builder.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/load/mssql/test_mssql_table_builder.py b/tests/load/mssql/test_mssql_table_builder.py index d6cf3ec3e8..3f3896de6c 100644 --- a/tests/load/mssql/test_mssql_table_builder.py +++ b/tests/load/mssql/test_mssql_table_builder.py @@ -55,8 +55,8 @@ def test_alter_table(client: MsSqlJobClient) -> None: # existing table has no columns sql = client._get_table_update_sql("event_test_table", TABLE_UPDATE, True)[0] sqlfluff.parse(sql, dialect="tsql") - canonical_name = client.sql_client.make_qualified_table_name("event_test_table") - assert sql.count(f"ALTER TABLE {canonical_name}\nADD") == 1 + qualified_name = client.sql_client.make_qualified_table_name("event_test_table") + assert sql.count(f"ALTER TABLE {qualified_name}\nADD") == 1 assert "event_test_table" in sql assert '"col1" bigint NOT NULL' in sql assert '"col2" float NOT NULL' in sql @@ -75,3 +75,11 @@ def test_alter_table(client: MsSqlJobClient) -> None: assert '"col6_precision" decimal(6,2) NOT NULL' in sql assert '"col7_precision" varbinary(19)' in sql assert '"col11_precision" time(3) NOT NULL' in sql + + +def test_create_dlt_table(client: MsSqlJobClient) -> None: + # non existing table + sql = client._get_table_update_sql("_dlt_version", TABLE_UPDATE, False)[0] + sqlfluff.parse(sql, dialect="tsql") + qualified_name = client.sql_client.make_qualified_table_name("_dlt_version") + assert f"CREATE TABLE {qualified_name}" in sql diff --git a/tests/load/postgres/test_postgres_table_builder.py b/tests/load/postgres/test_postgres_table_builder.py index 86bd67db9a..28fd4eec9d 100644 --- a/tests/load/postgres/test_postgres_table_builder.py +++ b/tests/load/postgres/test_postgres_table_builder.py @@ -57,7 +57,8 @@ def test_create_table(client: PostgresClient) -> None: # non existing table sql = client._get_table_update_sql("event_test_table", TABLE_UPDATE, False)[0] sqlfluff.parse(sql, dialect="postgres") - assert "event_test_table" in sql + qualified_name = client.sql_client.make_qualified_table_name("event_test_table") + assert f"CREATE TABLE {qualified_name}" in sql assert '"col1" bigint NOT NULL' in sql assert '"col2" double precision NOT NULL' in sql assert '"col3" boolean NOT NULL' in sql @@ -173,3 +174,11 @@ def test_create_table_case_sensitive(cs_client: PostgresClient) -> None: # every line starts with "Col" for line in sql.split("\n")[1:]: assert line.startswith('"Col') + + +def test_create_dlt_table(client: PostgresClient) -> None: + # non existing table + sql = client._get_table_update_sql("_dlt_version", TABLE_UPDATE, False)[0] + sqlfluff.parse(sql, dialect="postgres") + qualified_name = client.sql_client.make_qualified_table_name("_dlt_version") + assert f"CREATE TABLE IF NOT EXISTS {qualified_name}" in sql From 39a8b3beea1e8c87108bedd32fca3dd608b8fb6d Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sun, 25 Aug 2024 23:26:20 +0200 Subject: [PATCH 4/6] adds logging for terminal and retry exception in run_managed of load job --- dlt/common/destination/reference.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index b6c7041592..f30841e127 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -383,9 +383,17 @@ def run_managed( except (DestinationTerminalException, TerminalValueError) as e: self._state = "failed" self._exception = e + logger.exception( + f"Terminal exception in job {self.job_id()} on table {self.load_table_name} in file" + f" {self._file_path}" + ) except (DestinationTransientException, Exception) as e: self._state = "retry" self._exception = e + logger.exception( + f"Transient exception in job {self.job_id()} on table {self.load_table_name} in" + f" file {self._file_path}" + ) finally: self._finished_at = pendulum.now() # sanity check From 7f1f76fe3990f7debeea8b12965903cb288cbda4 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sun, 25 Aug 2024 23:26:52 +0200 Subject: [PATCH 5/6] passes schema update to be collected in trace in filesystem --- dlt/destinations/impl/filesystem/filesystem.py | 5 ++++- .../parent_child_relationship.py | 9 ++++----- .../test_parent_child_relationship.py | 10 ++++------ tests/load/athena_iceberg/test_athena_table_builder.py | 0 4 files changed, 12 insertions(+), 12 deletions(-) delete mode 100644 tests/load/athena_iceberg/test_athena_table_builder.py diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 2e09871ba9..5445fd2ae9 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -303,6 +303,7 @@ def update_stored_schema( only_tables: Iterable[str] = None, expected_update: TSchemaTables = None, ) -> TSchemaTables: + applied_update = super().update_stored_schema(only_tables, expected_update) # create destination dirs for all tables table_names = only_tables or self.schema.tables.keys() dirs_to_create = self.get_table_dirs(table_names) @@ -316,7 +317,9 @@ def update_stored_schema( if not self.config.as_staging: self._store_current_schema() - return expected_update + # we assume that expected_update == applied_update so table schemas in dest were not + # externally changed + return applied_update def get_table_dir(self, table_name: str, remote: bool = False) -> str: # dlt tables do not respect layout (for now) diff --git a/docs/examples/parent_child_relationship/parent_child_relationship.py b/docs/examples/parent_child_relationship/parent_child_relationship.py index 39c9f577cc..6de00ffb28 100644 --- a/docs/examples/parent_child_relationship/parent_child_relationship.py +++ b/docs/examples/parent_child_relationship/parent_child_relationship.py @@ -22,6 +22,7 @@ from typing import List, Dict, Any, Generator import dlt + # Define a dlt resource with write disposition to 'merge' @dlt.resource(name="parent_with_children", write_disposition={"disposition": "merge"}) def data_source() -> Generator[List[Dict[str, Any]], None, None]: @@ -44,6 +45,7 @@ def data_source() -> Generator[List[Dict[str, Any]], None, None]: yield data + # Function to add parent_id to each child record within a parent record def add_parent_id(record: Dict[str, Any]) -> Dict[str, Any]: parent_id_key = "parent_id" @@ -51,6 +53,7 @@ def add_parent_id(record: Dict[str, Any]) -> Dict[str, Any]: child[parent_id_key] = record[parent_id_key] return record + if __name__ == "__main__": # Create and configure the dlt pipeline pipeline = dlt.pipeline( @@ -60,10 +63,6 @@ def add_parent_id(record: Dict[str, Any]) -> Dict[str, Any]: ) # Run the pipeline - load_info = pipeline.run( - data_source() - .add_map(add_parent_id), - primary_key="parent_id" - ) + load_info = pipeline.run(data_source().add_map(add_parent_id), primary_key="parent_id") # Output the load information after pipeline execution print(load_info) diff --git a/docs/examples/parent_child_relationship/test_parent_child_relationship.py b/docs/examples/parent_child_relationship/test_parent_child_relationship.py index f671040823..95d1bade97 100644 --- a/docs/examples/parent_child_relationship/test_parent_child_relationship.py +++ b/docs/examples/parent_child_relationship/test_parent_child_relationship.py @@ -1,4 +1,3 @@ - import pytest from tests.utils import skipifgithubfork @@ -29,6 +28,7 @@ from typing import List, Dict, Any, Generator import dlt + # Define a dlt resource with write disposition to 'merge' @dlt.resource(name="parent_with_children", write_disposition={"disposition": "merge"}) def data_source() -> Generator[List[Dict[str, Any]], None, None]: @@ -51,6 +51,7 @@ def data_source() -> Generator[List[Dict[str, Any]], None, None]: yield data + # Function to add parent_id to each child record within a parent record def add_parent_id(record: Dict[str, Any]) -> Dict[str, Any]: parent_id_key = "parent_id" @@ -58,6 +59,7 @@ def add_parent_id(record: Dict[str, Any]) -> Dict[str, Any]: child[parent_id_key] = record[parent_id_key] return record + @skipifgithubfork @pytest.mark.forked def test_parent_child_relationship(): @@ -69,10 +71,6 @@ def test_parent_child_relationship(): ) # Run the pipeline - load_info = pipeline.run( - data_source() - .add_map(add_parent_id), - primary_key="parent_id" - ) + load_info = pipeline.run(data_source().add_map(add_parent_id), primary_key="parent_id") # Output the load information after pipeline execution print(load_info) diff --git a/tests/load/athena_iceberg/test_athena_table_builder.py b/tests/load/athena_iceberg/test_athena_table_builder.py deleted file mode 100644 index e69de29bb2..0000000000 From 40b99434827d1c20a4421b1d170b420572ed5ce6 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Mon, 26 Aug 2024 17:16:52 +0200 Subject: [PATCH 6/6] fixes job log exception message --- dlt/common/destination/capabilities.py | 2 +- dlt/common/destination/reference.py | 8 ++------ dlt/destinations/impl/mssql/factory.py | 2 +- dlt/destinations/impl/synapse/factory.py | 4 +++- dlt/destinations/job_client_impl.py | 2 +- tests/pipeline/test_pipeline_trace.py | 2 +- 6 files changed, 9 insertions(+), 11 deletions(-) diff --git a/dlt/common/destination/capabilities.py b/dlt/common/destination/capabilities.py index e9374c6618..52e7d74833 100644 --- a/dlt/common/destination/capabilities.py +++ b/dlt/common/destination/capabilities.py @@ -76,7 +76,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext): # use naming convention in the schema naming_convention: TNamingConventionReferenceArg = None alter_add_multi_column: bool = True - create_table_not_exists: bool = True + supports_create_table_if_not_exists: bool = True supports_truncate_command: bool = True schema_supports_numeric_precision: bool = True timestamp_precision: int = 6 diff --git a/dlt/common/destination/reference.py b/dlt/common/destination/reference.py index f30841e127..744cbbd1f5 100644 --- a/dlt/common/destination/reference.py +++ b/dlt/common/destination/reference.py @@ -383,16 +383,12 @@ def run_managed( except (DestinationTerminalException, TerminalValueError) as e: self._state = "failed" self._exception = e - logger.exception( - f"Terminal exception in job {self.job_id()} on table {self.load_table_name} in file" - f" {self._file_path}" - ) + logger.exception(f"Terminal exception in job {self.job_id()} in file {self._file_path}") except (DestinationTransientException, Exception) as e: self._state = "retry" self._exception = e logger.exception( - f"Transient exception in job {self.job_id()} on table {self.load_table_name} in" - f" file {self._file_path}" + f"Transient exception in job {self.job_id()} in file {self._file_path}" ) finally: self._finished_at = pendulum.now() diff --git a/dlt/destinations/impl/mssql/factory.py b/dlt/destinations/impl/mssql/factory.py index 4c03f1f64a..f1a8bb136a 100644 --- a/dlt/destinations/impl/mssql/factory.py +++ b/dlt/destinations/impl/mssql/factory.py @@ -37,7 +37,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.max_text_data_type_length = 2**30 - 1 caps.is_max_text_data_type_length_in_bytes = False caps.supports_ddl_transactions = True - caps.create_table_not_exists = False # IF NOT EXISTS not supported + caps.supports_create_table_if_not_exists = False # IF NOT EXISTS not supported caps.max_rows_per_insert = 1000 caps.timestamp_precision = 7 caps.supported_merge_strategies = ["delete-insert", "upsert", "scd2"] diff --git a/dlt/destinations/impl/synapse/factory.py b/dlt/destinations/impl/synapse/factory.py index d980dcac55..d5a0281bec 100644 --- a/dlt/destinations/impl/synapse/factory.py +++ b/dlt/destinations/impl/synapse/factory.py @@ -63,7 +63,9 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext: caps.supports_transactions = True caps.supports_ddl_transactions = False - caps.create_table_not_exists = False # IF NOT EXISTS on CREATE TABLE not supported + caps.supports_create_table_if_not_exists = ( + False # IF NOT EXISTS on CREATE TABLE not supported + ) # Synapse throws "Some part of your SQL statement is nested too deeply. Rewrite the query or break it up into smaller queries." # if number of records exceeds a certain number. Which exact number that is seems not deterministic: diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index 5095394cbd..1d6403a2c8 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -526,7 +526,7 @@ def _make_create_table(self, qualified_name: str, table: TTableSchema) -> str: not_exists_clause = " " if ( table["name"] in self.schema.dlt_table_names() - and self.capabilities.create_table_not_exists + and self.capabilities.supports_create_table_if_not_exists ): not_exists_clause = " IF NOT EXISTS " return f"CREATE TABLE{not_exists_clause}{qualified_name}" diff --git a/tests/pipeline/test_pipeline_trace.py b/tests/pipeline/test_pipeline_trace.py index 69c0f01b8b..4e52d2aa29 100644 --- a/tests/pipeline/test_pipeline_trace.py +++ b/tests/pipeline/test_pipeline_trace.py @@ -551,7 +551,7 @@ def test_trace_telemetry() -> None: for item in SENTRY_SENT_ITEMS: # print(item) print(item["logentry"]["message"]) - assert len(SENTRY_SENT_ITEMS) == 2 + assert len(SENTRY_SENT_ITEMS) == 4 # trace with exception @dlt.resource