diff --git a/dlt/destinations/impl/athena/athena.py b/dlt/destinations/impl/athena/athena.py index 3542544306..9f84a4d7b4 100644 --- a/dlt/destinations/impl/athena/athena.py +++ b/dlt/destinations/impl/athena/athena.py @@ -163,7 +163,9 @@ class AthenaMergeJob(SqlMergeFollowupJob): def _new_temp_table_name(cls, name_prefix: str, sql_client: SqlClientBase[Any]) -> str: # reproducible name so we know which table to drop with sql_client.with_staging_dataset(): - return sql_client.make_qualified_table_name(name_prefix) + return sql_client.make_qualified_table_name( + cls._shorten_table_name(name_prefix, sql_client) + ) @classmethod def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str: @@ -428,7 +430,7 @@ def _get_table_update_sql( # or if we are in iceberg mode, we create iceberg tables for all tables table = self.prepare_load_table(table_name) # do not create iceberg tables on staging dataset - create_iceberg = not self.in_staging_dataset_mode and self._is_iceberg_table(table) + create_iceberg = self._is_iceberg_table(table, self.in_staging_dataset_mode) columns = ", ".join([self._get_column_def_sql(c, table) for c in new_columns]) # create unique tag for iceberg table so it is never recreated in the same folder @@ -512,10 +514,15 @@ def _create_merge_followup_jobs( ) -> List[FollowupJobRequest]: return [AthenaMergeJob.from_table_chain(table_chain, self.sql_client)] - def _is_iceberg_table(self, table: PreparedTableSchema) -> bool: + def _is_iceberg_table( + self, table: PreparedTableSchema, is_staging_dataset: bool = False + ) -> bool: table_format = table.get("table_format") - # all dlt tables are iceberg tables - return table_format == "iceberg" or table["name"] in self.schema.dlt_table_names() + # all dlt tables that are not loaded via files are iceberg tables, no matter if they are on staging or regular dataset + # all other iceberg tables are HIVE (external) tables on staging dataset + return (table_format == "iceberg" and not is_staging_dataset) or table[ + "write_disposition" + ] == "skip" def should_load_data_to_staging_dataset(self, table_name: str) -> bool: # all iceberg tables need staging diff --git a/dlt/destinations/impl/dremio/dremio.py b/dlt/destinations/impl/dremio/dremio.py index 595b2d943a..d50bfbec2c 100644 --- a/dlt/destinations/impl/dremio/dremio.py +++ b/dlt/destinations/impl/dremio/dremio.py @@ -73,7 +73,9 @@ def from_db_type( class DremioMergeJob(SqlMergeFollowupJob): @classmethod def _new_temp_table_name(cls, name_prefix: str, sql_client: SqlClientBase[Any]) -> str: - return sql_client.make_qualified_table_name(f"_temp_{name_prefix}_{uniq_id()}") + return sql_client.make_qualified_table_name( + cls._shorten_table_name(f"_temp_{name_prefix}_{uniq_id()}", sql_client) + ) @classmethod def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str: diff --git a/dlt/destinations/impl/mssql/mssql.py b/dlt/destinations/impl/mssql/mssql.py index 4e090e3280..ade56b5065 100644 --- a/dlt/destinations/impl/mssql/mssql.py +++ b/dlt/destinations/impl/mssql/mssql.py @@ -136,8 +136,7 @@ def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str: @classmethod def _new_temp_table_name(cls, name_prefix: str, sql_client: SqlClientBase[Any]) -> str: - name = SqlMergeFollowupJob._new_temp_table_name(name_prefix, sql_client) - return "#" + name + return SqlMergeFollowupJob._new_temp_table_name("#" + name_prefix, sql_client) class MsSqlJobClient(InsertValuesJobClient): diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index 2fc6b709c6..a555fe8a1f 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -339,9 +339,18 @@ def gen_delete_from_sql( ); """ + @classmethod + def _shorten_table_name(cls, ident: str, sql_client: SqlClientBase[Any]) -> str: + """Trims identifier to max length supported by sql_client. Used for dynamically constructed table names""" + from dlt.common.normalizers.naming import NamingConvention + + return NamingConvention.shorten_identifier( + ident, ident, sql_client.capabilities.max_identifier_length + ) + @classmethod def _new_temp_table_name(cls, name_prefix: str, sql_client: SqlClientBase[Any]) -> str: - return f"{name_prefix}_{uniq_id()}" + return cls._shorten_table_name(f"{name_prefix}_{uniq_id()}", sql_client) @classmethod def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str: