Skip to content

Commit

Permalink
Review Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-jrose committed Sep 3, 2024
1 parent 001fb1d commit ac0c866
Showing 1 changed file with 26 additions and 29 deletions.
55 changes: 26 additions & 29 deletions src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,25 @@
TEMPORARY_STRING_SET = frozenset(["temporary", "temp"])


def validate_iceberg_config(iceberg_config: Optional[Dict[str, str]]) -> Dict[str, str]:
if not iceberg_config:
return dict()

iceberg_config = {k.lower(): v for k, v in iceberg_config.items()}
if "external_volume" not in iceberg_config:
return dict()

return {
EXTERNAL_VOLUME: iceberg_config.get("external_volume", None),
CATALOG: iceberg_config.get("catalog", None),
BASE_LOCATION: iceberg_config.get("base_location", None),
CATALOG_SYNC: iceberg_config.get("catalog_sync", None),
STORAGE_SERIALIZATION_POLICY: iceberg_config.get(
"storage_serialization_policy", None
),
}


def result_scan_statement(uuid_place_holder: str) -> str:
return (
SELECT
Expand Down Expand Up @@ -799,25 +818,14 @@ def create_table_statement(
CHANGE_TRACKING: change_tracking,
}

if iceberg_config is not None:
iceberg_config = {k.lower(): v for k, v in iceberg_config.items()}
options.update(
{
EXTERNAL_VOLUME: iceberg_config.get("external_volume", None),
CATALOG: iceberg_config.get("catalog", None),
BASE_LOCATION: iceberg_config.get("base_location", None),
CATALOG_SYNC: iceberg_config.get("catalog_sync", None),
STORAGE_SERIALIZATION_POLICY: iceberg_config.get(
"storage_serialization_policy", None
),
}
)

iceberg_config = validate_iceberg_config(iceberg_config)
options.update(iceberg_config)
options_statement = get_options_statement(options)

return (
f"{CREATE}{(OR + REPLACE) if replace else EMPTY_STRING}"
f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} "
f"{ICEBERG if iceberg_config is not None else EMPTY_STRING}{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}"
f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}"
f"{LEFT_PARENTHESIS}{schema}{RIGHT_PARENTHESIS}{cluster_by_clause}"
f"{options_statement}{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql}"
)
Expand Down Expand Up @@ -899,24 +907,13 @@ def create_table_as_select_statement(
MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time,
CHANGE_TRACKING: change_tracking,
}
if iceberg_config is not None:
iceberg_config = {k.lower(): v for k, v in iceberg_config.items()}
options.update(
{
EXTERNAL_VOLUME: iceberg_config.get("external_volume", None),
CATALOG: iceberg_config.get("catalog", None),
BASE_LOCATION: iceberg_config.get("base_location", None),
CATALOG_SYNC: iceberg_config.get("catalog_sync", None),
STORAGE_SERIALIZATION_POLICY: iceberg_config.get(
"storage_serialization_policy", None
),
}
)
iceberg_config = validate_iceberg_config(iceberg_config)
options.update(iceberg_config)
options_statement = get_options_statement(options)
return (
f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING}"
f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} "
f"{ICEBERG if iceberg_config is not None else EMPTY_STRING}{TABLE}"
f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}"
f"{IF + NOT + EXISTS if not replace and not error else EMPTY_STRING} "
f"{table_name}{column_definition_sql}{cluster_by_clause}{options_statement}"
f"{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql} {AS}{project_statement([], child)}"
Expand Down

0 comments on commit ac0c866

Please sign in to comment.