From ac0c866347d8fc90b62a066750215f157f2523b5 Mon Sep 17 00:00:00 2001 From: Jamison Date: Tue, 3 Sep 2024 09:53:55 -0700 Subject: [PATCH] Review Feedback --- .../_internal/analyzer/analyzer_utils.py | 55 +++++++++---------- 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py b/src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py index 86ffaf533ac..196fad949ec 100644 --- a/src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py +++ b/src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py @@ -188,6 +188,25 @@ TEMPORARY_STRING_SET = frozenset(["temporary", "temp"]) +def validate_iceberg_config(iceberg_config: Optional[Dict[str, str]]) -> Dict[str, str]: + if not iceberg_config: + return dict() + + iceberg_config = {k.lower(): v for k, v in iceberg_config.items()} + if "external_volume" not in iceberg_config: + return dict() + + return { + EXTERNAL_VOLUME: iceberg_config.get("external_volume", None), + CATALOG: iceberg_config.get("catalog", None), + BASE_LOCATION: iceberg_config.get("base_location", None), + CATALOG_SYNC: iceberg_config.get("catalog_sync", None), + STORAGE_SERIALIZATION_POLICY: iceberg_config.get( + "storage_serialization_policy", None + ), + } + + def result_scan_statement(uuid_place_holder: str) -> str: return ( SELECT @@ -799,25 +818,14 @@ def create_table_statement( CHANGE_TRACKING: change_tracking, } - if iceberg_config is not None: - iceberg_config = {k.lower(): v for k, v in iceberg_config.items()} - options.update( - { - EXTERNAL_VOLUME: iceberg_config.get("external_volume", None), - CATALOG: iceberg_config.get("catalog", None), - BASE_LOCATION: iceberg_config.get("base_location", None), - CATALOG_SYNC: iceberg_config.get("catalog_sync", None), - STORAGE_SERIALIZATION_POLICY: iceberg_config.get( - "storage_serialization_policy", None - ), - } - ) - + iceberg_config = validate_iceberg_config(iceberg_config) + options.update(iceberg_config) options_statement = get_options_statement(options) + return ( f"{CREATE}{(OR + REPLACE) if replace else EMPTY_STRING}" f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} " - f"{ICEBERG if iceberg_config is not None else EMPTY_STRING}{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}" + f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}" f"{LEFT_PARENTHESIS}{schema}{RIGHT_PARENTHESIS}{cluster_by_clause}" f"{options_statement}{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql}" ) @@ -899,24 +907,13 @@ def create_table_as_select_statement( MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time, CHANGE_TRACKING: change_tracking, } - if iceberg_config is not None: - iceberg_config = {k.lower(): v for k, v in iceberg_config.items()} - options.update( - { - EXTERNAL_VOLUME: iceberg_config.get("external_volume", None), - CATALOG: iceberg_config.get("catalog", None), - BASE_LOCATION: iceberg_config.get("base_location", None), - CATALOG_SYNC: iceberg_config.get("catalog_sync", None), - STORAGE_SERIALIZATION_POLICY: iceberg_config.get( - "storage_serialization_policy", None - ), - } - ) + iceberg_config = validate_iceberg_config(iceberg_config) + options.update(iceberg_config) options_statement = get_options_statement(options) return ( f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING}" f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} " - f"{ICEBERG if iceberg_config is not None else EMPTY_STRING}{TABLE}" + f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}" f"{IF + NOT + EXISTS if not replace and not error else EMPTY_STRING} " f"{table_name}{column_definition_sql}{cluster_by_clause}{options_statement}" f"{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql} {AS}{project_statement([], child)}"