From 8990f114e9664b16fad55d6d2eccc73c217af5d2 Mon Sep 17 00:00:00 2001 From: Steinthor Palsson Date: Thu, 18 Apr 2024 22:30:13 -0400 Subject: [PATCH] Match stored schema by version+version_hash solves detecting when dropped tables need to be recreated --- dlt/destinations/job_client_impl.py | 19 +++++++++++++++---- dlt/load/utils.py | 14 +++++--------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index 5838ab2ab7..abf9b9a119 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -19,6 +19,7 @@ Iterator, ContextManager, cast, + Union, ) import zlib import re @@ -186,7 +187,10 @@ def update_stored_schema( ) -> Optional[TSchemaTables]: super().update_stored_schema(only_tables, expected_update) applied_update: TSchemaTables = {} - schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash) + self.schema.version + schema_info = self.get_stored_schema_by_hash( + self.schema.stored_version_hash, self.schema.version + ) if schema_info is None: logger.info( f"Schema with hash {self.schema.stored_version_hash} not found in the storage." @@ -375,10 +379,17 @@ def get_stored_state(self, pipeline_name: str) -> StateInfo: return None return StateInfo(row[0], row[1], row[2], row[3], pendulum.instance(row[4])) - def get_stored_schema_by_hash(self, version_hash: str) -> StorageSchemaInfo: + def get_stored_schema_by_hash( + self, version_hash: str, version: Optional[int] = None + ) -> StorageSchemaInfo: name = self.sql_client.make_qualified_table_name(self.schema.version_table_name) - query = f"SELECT {self.version_table_schema_columns} FROM {name} WHERE version_hash = %s;" - return self._row_to_schema_info(query, version_hash) + query = f"SELECT {self.version_table_schema_columns} FROM {name} WHERE version_hash = %s" + params: List[Union[int, str]] = [version_hash] + if version is not None: + params.append(version) + query += " AND version = %s" + query += ";" + return self._row_to_schema_info(query, *params) def _execute_schema_update_sql(self, only_tables: Iterable[str]) -> TSchemaTables: sql_scripts, schema_update = self._build_schema_update_sql(only_tables) diff --git a/dlt/load/utils.py b/dlt/load/utils.py index 8b296b3033..7a3b83e8f8 100644 --- a/dlt/load/utils.py +++ b/dlt/load/utils.py @@ -150,24 +150,20 @@ def _init_dataset_and_update_schema( f"Client for {job_client.config.destination_type} will start initialize storage" f" {staging_text}" ) + job_client.initialize_storage() if drop_tables: - old_schema = job_client.schema - new_schema = job_client.schema.clone() - job_client.schema = new_schema - for table in drop_tables: - new_schema.tables.pop(table["name"], None) - new_schema._bump_version() + drop_table_names = [table["name"] for table in drop_tables] if hasattr(job_client, "drop_tables"): logger.info( f"Client for {job_client.config.destination_type} will drop tables {staging_text}" ) - job_client.drop_tables(*[table["name"] for table in drop_tables], replace_schema=True) - job_client.schema = old_schema - job_client.initialize_storage() + job_client.drop_tables(*drop_table_names, replace_schema=False) + logger.info( f"Client for {job_client.config.destination_type} will update schema to package schema" f" {staging_text}" ) + applied_update = job_client.update_stored_schema( only_tables=update_tables, expected_update=expected_update )