Skip to content

Commit

Permalink
shortens temp table names in sql jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Sep 1, 2024
1 parent 12a37e0 commit 4a974b4
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 9 deletions.
17 changes: 12 additions & 5 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ class AthenaMergeJob(SqlMergeFollowupJob):
def _new_temp_table_name(cls, name_prefix: str, sql_client: SqlClientBase[Any]) -> str:
# reproducible name so we know which table to drop
with sql_client.with_staging_dataset():
return sql_client.make_qualified_table_name(name_prefix)
return sql_client.make_qualified_table_name(
cls._shorten_table_name(name_prefix, sql_client)
)

@classmethod
def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str:
Expand Down Expand Up @@ -428,7 +430,7 @@ def _get_table_update_sql(
# or if we are in iceberg mode, we create iceberg tables for all tables
table = self.prepare_load_table(table_name)
# do not create iceberg tables on staging dataset
create_iceberg = not self.in_staging_dataset_mode and self._is_iceberg_table(table)
create_iceberg = self._is_iceberg_table(table, self.in_staging_dataset_mode)
columns = ", ".join([self._get_column_def_sql(c, table) for c in new_columns])

# create unique tag for iceberg table so it is never recreated in the same folder
Expand Down Expand Up @@ -512,10 +514,15 @@ def _create_merge_followup_jobs(
) -> List[FollowupJobRequest]:
return [AthenaMergeJob.from_table_chain(table_chain, self.sql_client)]

def _is_iceberg_table(self, table: PreparedTableSchema) -> bool:
def _is_iceberg_table(
self, table: PreparedTableSchema, is_staging_dataset: bool = False
) -> bool:
table_format = table.get("table_format")
# all dlt tables are iceberg tables
return table_format == "iceberg" or table["name"] in self.schema.dlt_table_names()
# all dlt tables that are not loaded via files are iceberg tables, no matter if they are on staging or regular dataset
# all other iceberg tables are HIVE (external) tables on staging dataset
return (table_format == "iceberg" and not is_staging_dataset) or table[
"write_disposition"
] == "skip"

def should_load_data_to_staging_dataset(self, table_name: str) -> bool:
# all iceberg tables need staging
Expand Down
4 changes: 3 additions & 1 deletion dlt/destinations/impl/dremio/dremio.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ def from_db_type(
class DremioMergeJob(SqlMergeFollowupJob):
@classmethod
def _new_temp_table_name(cls, name_prefix: str, sql_client: SqlClientBase[Any]) -> str:
return sql_client.make_qualified_table_name(f"_temp_{name_prefix}_{uniq_id()}")
return sql_client.make_qualified_table_name(
cls._shorten_table_name(f"_temp_{name_prefix}_{uniq_id()}", sql_client)
)

@classmethod
def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str:
Expand Down
3 changes: 1 addition & 2 deletions dlt/destinations/impl/mssql/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str:

@classmethod
def _new_temp_table_name(cls, name_prefix: str, sql_client: SqlClientBase[Any]) -> str:
name = SqlMergeFollowupJob._new_temp_table_name(name_prefix, sql_client)
return "#" + name
return SqlMergeFollowupJob._new_temp_table_name("#" + name_prefix, sql_client)


class MsSqlJobClient(InsertValuesJobClient):
Expand Down
11 changes: 10 additions & 1 deletion dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,18 @@ def gen_delete_from_sql(
);
"""

@classmethod
def _shorten_table_name(cls, ident: str, sql_client: SqlClientBase[Any]) -> str:
"""Trims identifier to max length supported by sql_client. Used for dynamically constructed table names"""
from dlt.common.normalizers.naming import NamingConvention

return NamingConvention.shorten_identifier(
ident, ident, sql_client.capabilities.max_identifier_length
)

@classmethod
def _new_temp_table_name(cls, name_prefix: str, sql_client: SqlClientBase[Any]) -> str:
return f"{name_prefix}_{uniq_id()}"
return cls._shorten_table_name(f"{name_prefix}_{uniq_id()}", sql_client)

@classmethod
def _to_temp_table(cls, select_sql: str, temp_table_name: str) -> str:
Expand Down

0 comments on commit 4a974b4

Please sign in to comment.