From dd7b0c935b2b5a7e009a4e1896341b1993cb3e08 Mon Sep 17 00:00:00 2001 From: Jamison Rose Date: Wed, 4 Sep 2024 16:24:53 -0700 Subject: [PATCH] SNOW-1058583: Add support for iceberg tables to create_table and copy_into_table statements. (#2162) --- CHANGELOG.md | 13 ++++ .../snowpark/_internal/analyzer/analyzer.py | 2 + .../_internal/analyzer/analyzer_utils.py | 68 ++++++++++++++----- .../_internal/analyzer/snowflake_plan.py | 12 ++++ .../_internal/analyzer/snowflake_plan_node.py | 4 ++ .../_internal/compiler/query_generator.py | 1 + src/snowflake/snowpark/dataframe.py | 16 +++++ src/snowflake/snowpark/dataframe_writer.py | 14 ++++ tests/integ/scala/test_dataframe_copy_into.py | 32 ++++++++- .../scala/test_dataframe_writer_suite.py | 34 +++++++++- tests/integ/scala/test_datatype_suite.py | 38 +++++++++-- tests/unit/test_analyzer_util_suite.py | 46 ++++++++++++- 12 files changed, 253 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e3d744eeb0..a1c19679161 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,19 @@ - `max_data_extension_time` - `change_tracking` - `copy_grants` + - `iceberg_config` A dicitionary that can hold the following iceberg configuration options: + - `external_volume` + - `catalog` + - `base_location` + - `catalog_sync` + - `storage_serialization_policy` +- Added support for specifying the following to `DataFrameWriter.copy_into_table`: + - `iceberg_config` A dicitionary that can hold the following iceberg configuration options: + - `external_volume` + - `catalog` + - `base_location` + - `catalog_sync` + - `storage_serialization_policy` - Added support for specifying the following parameters to `DataFrame.create_or_replace_dynamic_table`: - `mode` - `refresh_mode` diff --git a/src/snowflake/snowpark/_internal/analyzer/analyzer.py b/src/snowflake/snowpark/_internal/analyzer/analyzer.py index 6567754d229..76e91b7da92 100644 --- a/src/snowflake/snowpark/_internal/analyzer/analyzer.py +++ b/src/snowflake/snowpark/_internal/analyzer/analyzer.py @@ -1004,6 +1004,7 @@ def do_resolve_with_resolved_children( use_scoped_temp_objects=self.session._use_scoped_temp_objects, creation_source=logical_plan.creation_source, child_attributes=resolved_child.attributes, + iceberg_config=logical_plan.iceberg_config, ) if isinstance(logical_plan, Limit): @@ -1184,6 +1185,7 @@ def do_resolve_with_resolved_children( else None, user_schema=logical_plan.user_schema, create_table_from_infer_schema=logical_plan.create_table_from_infer_schema, + iceberg_config=logical_plan.iceberg_config, ) if isinstance(logical_plan, CopyIntoLocationNode): diff --git a/src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py b/src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py index e22355674ee..06847621b7f 100644 --- a/src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py +++ b/src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py @@ -133,6 +133,11 @@ DATA_RETENTION_TIME_IN_DAYS = " DATA_RETENTION_TIME_IN_DAYS " MAX_DATA_EXTENSION_TIME_IN_DAYS = " MAX_DATA_EXTENSION_TIME_IN_DAYS " CHANGE_TRACKING = " CHANGE_TRACKING " +EXTERNAL_VOLUME = " EXTERNAL_VOLUME " +CATALOG = " CATALOG " +BASE_LOCATION = " BASE_LOCATION " +CATALOG_SYNC = " CATALOG_SYNC " +STORAGE_SERIALIZATION_POLICY = " STORAGE_SERIALIZATION_POLICY " REG_EXP = " REGEXP " COLLATE = " COLLATE " RESULT_SCAN = " RESULT_SCAN" @@ -178,10 +183,30 @@ WITH = "WITH " DEFAULT_ON_NULL = " DEFAULT ON NULL " ANY = " ANY " +ICEBERG = " ICEBERG " TEMPORARY_STRING_SET = frozenset(["temporary", "temp"]) +def validate_iceberg_config(iceberg_config: Optional[dict]) -> Dict[str, str]: + if iceberg_config is None: + return dict() + + iceberg_config = {k.lower(): v for k, v in iceberg_config.items()} + if "base_location" not in iceberg_config: + raise ValueError("Iceberg table configuration requires base_location be set.") + + return { + EXTERNAL_VOLUME: iceberg_config.get("external_volume", None), + CATALOG: iceberg_config.get("catalog", None), + BASE_LOCATION: iceberg_config.get("base_location", None), + CATALOG_SYNC: iceberg_config.get("catalog_sync", None), + STORAGE_SERIALIZATION_POLICY: iceberg_config.get( + "storage_serialization_policy", None + ), + } + + def result_scan_statement(uuid_place_holder: str) -> str: return ( SELECT @@ -778,6 +803,7 @@ def create_table_statement( *, use_scoped_temp_objects: bool = False, is_generated: bool = False, + iceberg_config: Optional[dict] = None, ) -> str: cluster_by_clause = ( (CLUSTER_BY + LEFT_PARENTHESIS + COMMA.join(clustering_key) + RIGHT_PARENTHESIS) @@ -785,18 +811,21 @@ def create_table_statement( else EMPTY_STRING ) comment_sql = get_comment_sql(comment) - options_statement = get_options_statement( - { - ENABLE_SCHEMA_EVOLUTION: enable_schema_evolution, - DATA_RETENTION_TIME_IN_DAYS: data_retention_time, - MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time, - CHANGE_TRACKING: change_tracking, - } - ) + options = { + ENABLE_SCHEMA_EVOLUTION: enable_schema_evolution, + DATA_RETENTION_TIME_IN_DAYS: data_retention_time, + MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time, + CHANGE_TRACKING: change_tracking, + } + + iceberg_config = validate_iceberg_config(iceberg_config) + options.update(iceberg_config) + options_statement = get_options_statement(options) + return ( f"{CREATE}{(OR + REPLACE) if replace else EMPTY_STRING}" f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} " - f"{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}" + f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}" f"{LEFT_PARENTHESIS}{schema}{RIGHT_PARENTHESIS}{cluster_by_clause}" f"{options_statement}{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql}" ) @@ -856,6 +885,7 @@ def create_table_as_select_statement( max_data_extension_time: Optional[int] = None, change_tracking: Optional[bool] = None, copy_grants: bool = False, + iceberg_config: Optional[dict] = None, *, use_scoped_temp_objects: bool = False, is_generated: bool = False, @@ -871,18 +901,20 @@ def create_table_as_select_statement( else EMPTY_STRING ) comment_sql = get_comment_sql(comment) - options_statement = get_options_statement( - { - ENABLE_SCHEMA_EVOLUTION: enable_schema_evolution, - DATA_RETENTION_TIME_IN_DAYS: data_retention_time, - MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time, - CHANGE_TRACKING: change_tracking, - } - ) + options = { + ENABLE_SCHEMA_EVOLUTION: enable_schema_evolution, + DATA_RETENTION_TIME_IN_DAYS: data_retention_time, + MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time, + CHANGE_TRACKING: change_tracking, + } + iceberg_config = validate_iceberg_config(iceberg_config) + options.update(iceberg_config) + options_statement = get_options_statement(options) return ( f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING}" f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} " - f"{TABLE}{IF + NOT + EXISTS if not replace and not error else EMPTY_STRING} " + f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}" + f"{IF + NOT + EXISTS if not replace and not error else EMPTY_STRING} " f"{table_name}{column_definition_sql}{cluster_by_clause}{options_statement}" f"{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql} {AS}{project_statement([], child)}" ) diff --git a/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py b/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py index 559cbeb3cc5..3764c61410d 100644 --- a/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py +++ b/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py @@ -846,6 +846,7 @@ def save_as_table( use_scoped_temp_objects: bool, creation_source: TableCreationSource, child_attributes: Optional[List[Attribute]], + iceberg_config: Optional[dict] = None, ) -> SnowflakePlan: """Returns a SnowflakePlan to materialize the child plan into a table. @@ -872,6 +873,13 @@ def save_as_table( child_attributes: child attributes will be none in the case of large query breakdown where we use ctas query to create the table which does not need to know the column metadata. + iceberg_config: A dictionary that can contain the following iceberg configuration values: + external_volume: specifies the identifier for the external volume where + the Iceberg table stores its metadata files and data in Parquet format + catalog: specifies either Snowflake or a catalog integration to use for this table + base_location: the base directory that snowflake can write iceberg metadata and files to + catalog_sync: optionally sets the catalog integration configured for Polaris Catalog + storage_serialization_policy: specifies the storage serialization policy for the table """ is_generated = creation_source in ( TableCreationSource.CACHE_RESULT, @@ -929,6 +937,7 @@ def get_create_table_as_select_plan(child: SnowflakePlan, replace, error): max_data_extension_time=max_data_extension_time, change_tracking=change_tracking, copy_grants=copy_grants, + iceberg_config=iceberg_config, use_scoped_temp_objects=use_scoped_temp_objects, is_generated=is_generated, ), @@ -957,6 +966,7 @@ def get_create_and_insert_plan(child: SnowflakePlan, replace, error): copy_grants=copy_grants, use_scoped_temp_objects=use_scoped_temp_objects, is_generated=is_generated, + iceberg_config=iceberg_config, ) # so that dataframes created from non-select statements, @@ -1380,6 +1390,7 @@ def copy_into_table( *, copy_options: Dict[str, Any], format_type_options: Dict[str, Any], + iceberg_config: Optional[dict] = None, ) -> SnowflakePlan: # tracking usage of pattern, will refactor this function in future if pattern: @@ -1413,6 +1424,7 @@ def copy_into_table( create_table_statement( full_table_name, attribute_to_schema_string(attributes), + iceberg_config=iceberg_config, ), # This is an exception. The principle is to avoid surprising behavior and most of the time # it applies to temp object. But this perm table creation is also one place where we create diff --git a/src/snowflake/snowpark/_internal/analyzer/snowflake_plan_node.py b/src/snowflake/snowpark/_internal/analyzer/snowflake_plan_node.py index 69e9f1332f4..e3e032cd94b 100644 --- a/src/snowflake/snowpark/_internal/analyzer/snowflake_plan_node.py +++ b/src/snowflake/snowpark/_internal/analyzer/snowflake_plan_node.py @@ -197,6 +197,7 @@ def __init__( max_data_extension_time: Optional[int] = None, change_tracking: Optional[bool] = None, copy_grants: bool = False, + iceberg_config: Optional[dict] = None, ) -> None: super().__init__() @@ -217,6 +218,7 @@ def __init__( self.max_data_extension_time = max_data_extension_time self.change_tracking = change_tracking self.copy_grants = copy_grants + self.iceberg_config = iceberg_config @property def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]: @@ -277,6 +279,7 @@ def __init__( user_schema: Optional[StructType] = None, cur_options: Optional[Dict[str, Any]] = None, # the options of DataFrameReader create_table_from_infer_schema: bool = False, + iceberg_config: Optional[dict] = None, ) -> None: super().__init__() self.table_name = table_name @@ -292,6 +295,7 @@ def __init__( self.user_schema = user_schema self.cur_options = cur_options self.create_table_from_infer_schema = create_table_from_infer_schema + self.iceberg_config = iceberg_config class CopyIntoLocationNode(LogicalPlan): diff --git a/src/snowflake/snowpark/_internal/compiler/query_generator.py b/src/snowflake/snowpark/_internal/compiler/query_generator.py index 249e816ff38..2cde864c062 100644 --- a/src/snowflake/snowpark/_internal/compiler/query_generator.py +++ b/src/snowflake/snowpark/_internal/compiler/query_generator.py @@ -176,6 +176,7 @@ def do_resolve_with_resolved_children( use_scoped_temp_objects=self.session._use_scoped_temp_objects, creation_source=logical_plan.creation_source, child_attributes=child_attributes, + iceberg_config=logical_plan.iceberg_config, ) elif isinstance( diff --git a/src/snowflake/snowpark/dataframe.py b/src/snowflake/snowpark/dataframe.py index 8512e7c018e..f3df997e373 100644 --- a/src/snowflake/snowpark/dataframe.py +++ b/src/snowflake/snowpark/dataframe.py @@ -3026,6 +3026,7 @@ def copy_into_table( transformations: Optional[Iterable[ColumnOrName]] = None, format_type_options: Optional[Dict[str, Any]] = None, statement_params: Optional[Dict[str, str]] = None, + iceberg_config: Optional[dict] = None, **copy_options: Any, ) -> List[Row]: """Executes a `COPY INTO `__ command to load data from files in a stage location into a specified table. @@ -3079,6 +3080,19 @@ def copy_into_table( transformations: A list of column transformations. format_type_options: A dict that contains the ``formatTypeOptions`` of the ``COPY INTO
`` command. statement_params: Dictionary of statement level parameters to be set while executing this action. + iceberg_config: A dictionary that can contain the following iceberg configuration values: + + * external_volume: specifies the identifier for the external volume where + the Iceberg table stores its metadata files and data in Parquet format + + * catalog: specifies either Snowflake or a catalog integration to use for this table + + * base_location: the base directory that snowflake can write iceberg metadata and files to + + * catalog_sync: optionally sets the catalog integration configured for Polaris Catalog + + * storage_serialization_policy: specifies the storage serialization policy for the table + copy_options: The kwargs that is used to specify the ``copyOptions`` of the ``COPY INTO
`` command. """ if not self._reader or not self._reader._file_path: @@ -3144,6 +3158,7 @@ def copy_into_table( if transformations else None ) + return DataFrame( self._session, CopyIntoTableNode( @@ -3160,6 +3175,7 @@ def copy_into_table( user_schema=self._reader._user_schema, cur_options=self._reader._cur_options, create_table_from_infer_schema=create_table_from_infer_schema, + iceberg_config=iceberg_config, ), )._internal_collect_with_tag_no_telemetry(statement_params=statement_params) diff --git a/src/snowflake/snowpark/dataframe_writer.py b/src/snowflake/snowpark/dataframe_writer.py index b3a4abfeaae..65cf6205724 100644 --- a/src/snowflake/snowpark/dataframe_writer.py +++ b/src/snowflake/snowpark/dataframe_writer.py @@ -139,6 +139,7 @@ def save_as_table( max_data_extension_time: Optional[int] = None, change_tracking: Optional[bool] = None, copy_grants: bool = False, + iceberg_config: Optional[dict] = None, ) -> Optional[AsyncJob]: """Writes the data to the specified table in a Snowflake database. @@ -186,6 +187,18 @@ def save_as_table( block: A bool value indicating whether this function will wait until the result is available. When it is ``False``, this function executes the underlying queries of the dataframe asynchronously and returns an :class:`AsyncJob`. + iceberg_config: A dictionary that can contain the following iceberg configuration values: + + * external_volume: specifies the identifier for the external volume where + the Iceberg table stores its metadata files and data in Parquet format + + * catalog: specifies either Snowflake or a catalog integration to use for this table + + * base_location: the base directory that snowflake can write iceberg metadata and files to + + * catalog_sync: optionally sets the catalog integration configured for Polaris Catalog + + * storage_serialization_policy: specifies the storage serialization policy for the table Examples:: @@ -256,6 +269,7 @@ def save_as_table( max_data_extension_time, change_tracking, copy_grants, + iceberg_config, ) session = self._dataframe._session snowflake_plan = session._analyzer.resolve(create_table_logic_plan) diff --git a/tests/integ/scala/test_dataframe_copy_into.py b/tests/integ/scala/test_dataframe_copy_into.py index 6ef13e2412d..fb899f8af11 100644 --- a/tests/integ/scala/test_dataframe_copy_into.py +++ b/tests/integ/scala/test_dataframe_copy_into.py @@ -25,7 +25,7 @@ StructField, StructType, ) -from tests.utils import IS_IN_STORED_PROC, TestFiles, Utils +from tests.utils import IS_IN_STORED_PROC, TestFiles, Utils, iceberg_supported test_file_csv = "testCSV.csv" test_file2_csv = "test2CSV.csv" @@ -245,6 +245,36 @@ def test_copy_csv_basic(session, tmp_stage_name1, tmp_table_name): ) +def test_copy_into_csv_iceberg( + session, tmp_stage_name1, tmp_table_name, local_testing_mode +): + if not iceberg_supported(session, local_testing_mode): + pytest.skip("Test requires iceberg support.") + test_file_on_stage = f"@{tmp_stage_name1}/{test_file_csv}" + df = session.read.schema(user_schema).csv(test_file_on_stage) + + test_table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + df.copy_into_table( + test_table_name, + iceberg_config={ + "external_volume": "PYTHON_CONNECTOR_ICEBERG_EXVOL", + "CATALOG": "SNOWFLAKE", + "BASE_LOCATION": "snowpark_python_tests", + }, + ) + try: + # Check that table is an iceberg table with correct properties set + ddl = session._run_query(f"select get_ddl('table', '{test_table_name}')") + assert ( + ddl[0][0] + == f"create or replace ICEBERG TABLE {test_table_name} (\n\tA LONG,\n\tB STRING,\n\tC DOUBLE\n)\n EXTERNAL_VOLUME = 'PYTHON_CONNECTOR_ICEBERG_EXVOL'\n CATALOG = 'SNOWFLAKE'\n BASE_LOCATION = 'snowpark_python_tests/';" + ) + # Check that a copy_into works on the newly created table. + df.copy_into_table(test_table_name) + finally: + Utils.drop_table(session, test_table_name) + + def test_copy_csv_create_table_if_not_exists(session, tmp_stage_name1): test_file_on_stage = f"@{tmp_stage_name1}/{test_file_csv}" df = session.read.schema(user_schema).csv(test_file_on_stage) diff --git a/tests/integ/scala/test_dataframe_writer_suite.py b/tests/integ/scala/test_dataframe_writer_suite.py index 54cdfbbbd2b..275f32e955d 100644 --- a/tests/integ/scala/test_dataframe_writer_suite.py +++ b/tests/integ/scala/test_dataframe_writer_suite.py @@ -21,7 +21,7 @@ StructField, StructType, ) -from tests.utils import TestFiles, Utils +from tests.utils import TestFiles, Utils, iceberg_supported def test_write_with_target_column_name_order(session, local_testing_mode): @@ -153,6 +153,38 @@ def test_write_with_target_table_autoincrement( Utils.drop_table(session, table_name) +def test_iceberg(session, local_testing_mode): + if not iceberg_supported(session, local_testing_mode): + pytest.skip("Test requires iceberg support.") + + table_name = Utils.random_table_name() + df = session.create_dataframe( + [], + schema=StructType( + [ + StructField("a", StringType()), + StructField("b", IntegerType()), + ] + ), + ) + df.write.save_as_table( + table_name, + iceberg_config={ + "external_volume": "PYTHON_CONNECTOR_ICEBERG_EXVOL", + "catalog": "SNOWFLAKE", + "base_location": "snowpark_python_tests", + }, + ) + try: + ddl = session._run_query(f"select get_ddl('table', '{table_name}')") + assert ( + ddl[0][0] + == f"create or replace ICEBERG TABLE {table_name} (\n\tA STRING,\n\tB LONG\n)\n EXTERNAL_VOLUME = 'PYTHON_CONNECTOR_ICEBERG_EXVOL'\n CATALOG = 'SNOWFLAKE'\n BASE_LOCATION = 'snowpark_python_tests/';" + ) + finally: + session.table(table_name).drop_table() + + def test_negative_write_with_target_column_name_order(session): table_name = Utils.random_table_name() session.create_dataframe( diff --git a/tests/integ/scala/test_datatype_suite.py b/tests/integ/scala/test_datatype_suite.py index d8b7a91ec6f..edc97b41a48 100644 --- a/tests/integ/scala/test_datatype_suite.py +++ b/tests/integ/scala/test_datatype_suite.py @@ -132,6 +132,12 @@ def _create_test_dataframe(s): ), } +ICEBERG_CONFIG = { + "catalog": "SNOWFLAKE", + "external_volume": "python_connector_iceberg_exvol", + "base_location": "python_connector_merge_gate", +} + @pytest.fixture(scope="module") def structured_type_support(session, local_testing_mode): @@ -424,8 +430,9 @@ def test_structured_dtypes_pandas(structured_type_session, structured_type_suppo ) -@pytest.mark.skip( - "SNOW-1356851: Skipping until iceberg testing infrastructure is added." +@pytest.mark.skipif( + "config.getoption('local_testing_mode', default=False)", + reason="strucutred types do not fully support structured types yet.", ) def test_structured_dtypes_iceberg( structured_type_session, local_testing_mode, structured_type_support @@ -434,10 +441,11 @@ def test_structured_dtypes_iceberg( structured_type_support and iceberg_supported(structured_type_session, local_testing_mode) ): - pytest.mark.skip("Test requires iceberg support and structured type support.") + pytest.skip("Test requires iceberg support and structured type support.") query, expected_dtypes, expected_schema = STRUCTURED_TYPES_EXAMPLES[True] table_name = f"snowpark_structured_dtypes_{uuid.uuid4().hex[:5]}" + save_table_name = f"snowpark_structured_dtypes_{uuid.uuid4().hex[:5]}" try: structured_type_session.sql( f""" @@ -460,12 +468,30 @@ def test_structured_dtypes_iceberg( df = structured_type_session.table(table_name) assert df.schema == expected_schema assert df.dtypes == expected_dtypes + + # Try to save_as_table + structured_type_session.table(table_name).write.save_as_table( + save_table_name, iceberg_config=ICEBERG_CONFIG + ) + + save_ddl = structured_type_session._run_query( + f"select get_ddl('table', '{save_table_name}')" + ) + assert save_ddl[0][0] == ( + f"create or replace ICEBERG TABLE {save_table_name.upper()} (\n\t" + "MAP MAP(STRING, LONG),\n\tOBJ OBJECT(A STRING, B DOUBLE),\n\tARR ARRAY(DOUBLE)\n)\n " + "EXTERNAL_VOLUME = 'PYTHON_CONNECTOR_ICEBERG_EXVOL'\n CATALOG = 'SNOWFLAKE'\n " + "BASE_LOCATION = 'python_connector_merge_gate/';" + ) + finally: structured_type_session.sql(f"drop table if exists {table_name}") + structured_type_session.sql(f"drop table if exists {save_table_name}") -@pytest.mark.skip( - "SNOW-1356851: Skipping until iceberg testing infrastructure is added." +@pytest.mark.skipif( + "config.getoption('local_testing_mode', default=False)", + reason="strucutred types do not fully support structured types yet.", ) def test_structured_dtypes_iceberg_udf( structured_type_session, local_testing_mode, structured_type_support @@ -474,7 +500,7 @@ def test_structured_dtypes_iceberg_udf( structured_type_support and iceberg_supported(structured_type_session, local_testing_mode) ): - pytest.mark.skip("Test requires iceberg support and structured type support.") + pytest.skip("Test requires iceberg support and structured type support.") query, expected_dtypes, expected_schema = STRUCTURED_TYPES_EXAMPLES[True] table_name = f"snowpark_structured_dtypes_udf_test{uuid.uuid4().hex[:5]}" diff --git a/tests/unit/test_analyzer_util_suite.py b/tests/unit/test_analyzer_util_suite.py index 35537bc0eff..37341bc7a2e 100644 --- a/tests/unit/test_analyzer_util_suite.py +++ b/tests/unit/test_analyzer_util_suite.py @@ -1,7 +1,6 @@ # # Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. # - import pytest from snowflake.snowpark._internal.analyzer.analyzer_utils import ( @@ -353,3 +352,48 @@ def test_join_statement_negative(): ValueError, match="A join should either have using clause or a join condition" ): join_statement("", "", join_type, "cond2", "", False) + + +def test_create_iceberg_table_statement(): + with pytest.raises( + ValueError, match="Iceberg table configuration requires base_location be set." + ): + create_table_statement( + table_name="test_table", + schema="test_col varchar", + iceberg_config={}, + ) + assert create_table_statement( + table_name="test_table", + schema="test_col varchar", + iceberg_config={ + "external_volume": "example_volume", + "catalog": "example_catalog", + "base_location": "/root", + "catalog_sync": "integration_name", + "storage_serialization_policy": "OPTIMIZED", + }, + ) == ( + " CREATE ICEBERG TABLE test_table(test_col varchar) EXTERNAL_VOLUME = 'example_volume' " + " CATALOG = 'example_catalog' BASE_LOCATION = '/root' CATALOG_SYNC = 'integration_name'" + " STORAGE_SERIALIZATION_POLICY = 'OPTIMIZED' " + ) + + +def test_create_iceberg_table_as_select_statement(): + assert create_table_as_select_statement( + table_name="test_table", + child="select * from foo", + column_definition=None, + iceberg_config={ + "external_volume": "example_volume", + "catalog": "example_catalog", + "base_location": "/root", + "catalog_sync": "integration_name", + "storage_serialization_policy": "OPTIMIZED", + }, + ) == ( + " CREATE ICEBERG TABLE test_table EXTERNAL_VOLUME = 'example_volume' CATALOG = " + "'example_catalog' BASE_LOCATION = '/root' CATALOG_SYNC = 'integration_name' " + "STORAGE_SERIALIZATION_POLICY = 'OPTIMIZED' AS SELECT * FROM (select * from foo)" + )