Skip to content

Commit

Permalink
SNOW-1058583: Add support for iceberg tables to create_table and copy…
Browse files Browse the repository at this point in the history
…_into_table statements. (#2162)
  • Loading branch information
sfc-gh-jrose authored Sep 4, 2024
1 parent e481c5c commit dd7b0c9
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 27 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@
- `max_data_extension_time`
- `change_tracking`
- `copy_grants`
- `iceberg_config` A dicitionary that can hold the following iceberg configuration options:
- `external_volume`
- `catalog`
- `base_location`
- `catalog_sync`
- `storage_serialization_policy`
- Added support for specifying the following to `DataFrameWriter.copy_into_table`:
- `iceberg_config` A dicitionary that can hold the following iceberg configuration options:
- `external_volume`
- `catalog`
- `base_location`
- `catalog_sync`
- `storage_serialization_policy`
- Added support for specifying the following parameters to `DataFrame.create_or_replace_dynamic_table`:
- `mode`
- `refresh_mode`
Expand Down
2 changes: 2 additions & 0 deletions src/snowflake/snowpark/_internal/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,7 @@ def do_resolve_with_resolved_children(
use_scoped_temp_objects=self.session._use_scoped_temp_objects,
creation_source=logical_plan.creation_source,
child_attributes=resolved_child.attributes,
iceberg_config=logical_plan.iceberg_config,
)

if isinstance(logical_plan, Limit):
Expand Down Expand Up @@ -1184,6 +1185,7 @@ def do_resolve_with_resolved_children(
else None,
user_schema=logical_plan.user_schema,
create_table_from_infer_schema=logical_plan.create_table_from_infer_schema,
iceberg_config=logical_plan.iceberg_config,
)

if isinstance(logical_plan, CopyIntoLocationNode):
Expand Down
68 changes: 50 additions & 18 deletions src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@
DATA_RETENTION_TIME_IN_DAYS = " DATA_RETENTION_TIME_IN_DAYS "
MAX_DATA_EXTENSION_TIME_IN_DAYS = " MAX_DATA_EXTENSION_TIME_IN_DAYS "
CHANGE_TRACKING = " CHANGE_TRACKING "
EXTERNAL_VOLUME = " EXTERNAL_VOLUME "
CATALOG = " CATALOG "
BASE_LOCATION = " BASE_LOCATION "
CATALOG_SYNC = " CATALOG_SYNC "
STORAGE_SERIALIZATION_POLICY = " STORAGE_SERIALIZATION_POLICY "
REG_EXP = " REGEXP "
COLLATE = " COLLATE "
RESULT_SCAN = " RESULT_SCAN"
Expand Down Expand Up @@ -178,10 +183,30 @@
WITH = "WITH "
DEFAULT_ON_NULL = " DEFAULT ON NULL "
ANY = " ANY "
ICEBERG = " ICEBERG "

TEMPORARY_STRING_SET = frozenset(["temporary", "temp"])


def validate_iceberg_config(iceberg_config: Optional[dict]) -> Dict[str, str]:
if iceberg_config is None:
return dict()

iceberg_config = {k.lower(): v for k, v in iceberg_config.items()}
if "base_location" not in iceberg_config:
raise ValueError("Iceberg table configuration requires base_location be set.")

return {
EXTERNAL_VOLUME: iceberg_config.get("external_volume", None),
CATALOG: iceberg_config.get("catalog", None),
BASE_LOCATION: iceberg_config.get("base_location", None),
CATALOG_SYNC: iceberg_config.get("catalog_sync", None),
STORAGE_SERIALIZATION_POLICY: iceberg_config.get(
"storage_serialization_policy", None
),
}


def result_scan_statement(uuid_place_holder: str) -> str:
return (
SELECT
Expand Down Expand Up @@ -778,25 +803,29 @@ def create_table_statement(
*,
use_scoped_temp_objects: bool = False,
is_generated: bool = False,
iceberg_config: Optional[dict] = None,
) -> str:
cluster_by_clause = (
(CLUSTER_BY + LEFT_PARENTHESIS + COMMA.join(clustering_key) + RIGHT_PARENTHESIS)
if clustering_key
else EMPTY_STRING
)
comment_sql = get_comment_sql(comment)
options_statement = get_options_statement(
{
ENABLE_SCHEMA_EVOLUTION: enable_schema_evolution,
DATA_RETENTION_TIME_IN_DAYS: data_retention_time,
MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time,
CHANGE_TRACKING: change_tracking,
}
)
options = {
ENABLE_SCHEMA_EVOLUTION: enable_schema_evolution,
DATA_RETENTION_TIME_IN_DAYS: data_retention_time,
MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time,
CHANGE_TRACKING: change_tracking,
}

iceberg_config = validate_iceberg_config(iceberg_config)
options.update(iceberg_config)
options_statement = get_options_statement(options)

return (
f"{CREATE}{(OR + REPLACE) if replace else EMPTY_STRING}"
f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} "
f"{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}"
f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}"
f"{LEFT_PARENTHESIS}{schema}{RIGHT_PARENTHESIS}{cluster_by_clause}"
f"{options_statement}{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql}"
)
Expand Down Expand Up @@ -856,6 +885,7 @@ def create_table_as_select_statement(
max_data_extension_time: Optional[int] = None,
change_tracking: Optional[bool] = None,
copy_grants: bool = False,
iceberg_config: Optional[dict] = None,
*,
use_scoped_temp_objects: bool = False,
is_generated: bool = False,
Expand All @@ -871,18 +901,20 @@ def create_table_as_select_statement(
else EMPTY_STRING
)
comment_sql = get_comment_sql(comment)
options_statement = get_options_statement(
{
ENABLE_SCHEMA_EVOLUTION: enable_schema_evolution,
DATA_RETENTION_TIME_IN_DAYS: data_retention_time,
MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time,
CHANGE_TRACKING: change_tracking,
}
)
options = {
ENABLE_SCHEMA_EVOLUTION: enable_schema_evolution,
DATA_RETENTION_TIME_IN_DAYS: data_retention_time,
MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time,
CHANGE_TRACKING: change_tracking,
}
iceberg_config = validate_iceberg_config(iceberg_config)
options.update(iceberg_config)
options_statement = get_options_statement(options)
return (
f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING}"
f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} "
f"{TABLE}{IF + NOT + EXISTS if not replace and not error else EMPTY_STRING} "
f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}"
f"{IF + NOT + EXISTS if not replace and not error else EMPTY_STRING} "
f"{table_name}{column_definition_sql}{cluster_by_clause}{options_statement}"
f"{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql} {AS}{project_statement([], child)}"
)
Expand Down
12 changes: 12 additions & 0 deletions src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,7 @@ def save_as_table(
use_scoped_temp_objects: bool,
creation_source: TableCreationSource,
child_attributes: Optional[List[Attribute]],
iceberg_config: Optional[dict] = None,
) -> SnowflakePlan:
"""Returns a SnowflakePlan to materialize the child plan into a table.
Expand All @@ -872,6 +873,13 @@ def save_as_table(
child_attributes: child attributes will be none in the case of large query breakdown
where we use ctas query to create the table which does not need to know the column
metadata.
iceberg_config: A dictionary that can contain the following iceberg configuration values:
external_volume: specifies the identifier for the external volume where
the Iceberg table stores its metadata files and data in Parquet format
catalog: specifies either Snowflake or a catalog integration to use for this table
base_location: the base directory that snowflake can write iceberg metadata and files to
catalog_sync: optionally sets the catalog integration configured for Polaris Catalog
storage_serialization_policy: specifies the storage serialization policy for the table
"""
is_generated = creation_source in (
TableCreationSource.CACHE_RESULT,
Expand Down Expand Up @@ -929,6 +937,7 @@ def get_create_table_as_select_plan(child: SnowflakePlan, replace, error):
max_data_extension_time=max_data_extension_time,
change_tracking=change_tracking,
copy_grants=copy_grants,
iceberg_config=iceberg_config,
use_scoped_temp_objects=use_scoped_temp_objects,
is_generated=is_generated,
),
Expand Down Expand Up @@ -957,6 +966,7 @@ def get_create_and_insert_plan(child: SnowflakePlan, replace, error):
copy_grants=copy_grants,
use_scoped_temp_objects=use_scoped_temp_objects,
is_generated=is_generated,
iceberg_config=iceberg_config,
)

# so that dataframes created from non-select statements,
Expand Down Expand Up @@ -1380,6 +1390,7 @@ def copy_into_table(
*,
copy_options: Dict[str, Any],
format_type_options: Dict[str, Any],
iceberg_config: Optional[dict] = None,
) -> SnowflakePlan:
# tracking usage of pattern, will refactor this function in future
if pattern:
Expand Down Expand Up @@ -1413,6 +1424,7 @@ def copy_into_table(
create_table_statement(
full_table_name,
attribute_to_schema_string(attributes),
iceberg_config=iceberg_config,
),
# This is an exception. The principle is to avoid surprising behavior and most of the time
# it applies to temp object. But this perm table creation is also one place where we create
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def __init__(
max_data_extension_time: Optional[int] = None,
change_tracking: Optional[bool] = None,
copy_grants: bool = False,
iceberg_config: Optional[dict] = None,
) -> None:
super().__init__()

Expand All @@ -217,6 +218,7 @@ def __init__(
self.max_data_extension_time = max_data_extension_time
self.change_tracking = change_tracking
self.copy_grants = copy_grants
self.iceberg_config = iceberg_config

@property
def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]:
Expand Down Expand Up @@ -277,6 +279,7 @@ def __init__(
user_schema: Optional[StructType] = None,
cur_options: Optional[Dict[str, Any]] = None, # the options of DataFrameReader
create_table_from_infer_schema: bool = False,
iceberg_config: Optional[dict] = None,
) -> None:
super().__init__()
self.table_name = table_name
Expand All @@ -292,6 +295,7 @@ def __init__(
self.user_schema = user_schema
self.cur_options = cur_options
self.create_table_from_infer_schema = create_table_from_infer_schema
self.iceberg_config = iceberg_config


class CopyIntoLocationNode(LogicalPlan):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def do_resolve_with_resolved_children(
use_scoped_temp_objects=self.session._use_scoped_temp_objects,
creation_source=logical_plan.creation_source,
child_attributes=child_attributes,
iceberg_config=logical_plan.iceberg_config,
)

elif isinstance(
Expand Down
16 changes: 16 additions & 0 deletions src/snowflake/snowpark/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3026,6 +3026,7 @@ def copy_into_table(
transformations: Optional[Iterable[ColumnOrName]] = None,
format_type_options: Optional[Dict[str, Any]] = None,
statement_params: Optional[Dict[str, str]] = None,
iceberg_config: Optional[dict] = None,
**copy_options: Any,
) -> List[Row]:
"""Executes a `COPY INTO <table> <https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html>`__ command to load data from files in a stage location into a specified table.
Expand Down Expand Up @@ -3079,6 +3080,19 @@ def copy_into_table(
transformations: A list of column transformations.
format_type_options: A dict that contains the ``formatTypeOptions`` of the ``COPY INTO <table>`` command.
statement_params: Dictionary of statement level parameters to be set while executing this action.
iceberg_config: A dictionary that can contain the following iceberg configuration values:
* external_volume: specifies the identifier for the external volume where
the Iceberg table stores its metadata files and data in Parquet format
* catalog: specifies either Snowflake or a catalog integration to use for this table
* base_location: the base directory that snowflake can write iceberg metadata and files to
* catalog_sync: optionally sets the catalog integration configured for Polaris Catalog
* storage_serialization_policy: specifies the storage serialization policy for the table
copy_options: The kwargs that is used to specify the ``copyOptions`` of the ``COPY INTO <table>`` command.
"""
if not self._reader or not self._reader._file_path:
Expand Down Expand Up @@ -3144,6 +3158,7 @@ def copy_into_table(
if transformations
else None
)

return DataFrame(
self._session,
CopyIntoTableNode(
Expand All @@ -3160,6 +3175,7 @@ def copy_into_table(
user_schema=self._reader._user_schema,
cur_options=self._reader._cur_options,
create_table_from_infer_schema=create_table_from_infer_schema,
iceberg_config=iceberg_config,
),
)._internal_collect_with_tag_no_telemetry(statement_params=statement_params)

Expand Down
14 changes: 14 additions & 0 deletions src/snowflake/snowpark/dataframe_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def save_as_table(
max_data_extension_time: Optional[int] = None,
change_tracking: Optional[bool] = None,
copy_grants: bool = False,
iceberg_config: Optional[dict] = None,
) -> Optional[AsyncJob]:
"""Writes the data to the specified table in a Snowflake database.
Expand Down Expand Up @@ -186,6 +187,18 @@ def save_as_table(
block: A bool value indicating whether this function will wait until the result is available.
When it is ``False``, this function executes the underlying queries of the dataframe
asynchronously and returns an :class:`AsyncJob`.
iceberg_config: A dictionary that can contain the following iceberg configuration values:
* external_volume: specifies the identifier for the external volume where
the Iceberg table stores its metadata files and data in Parquet format
* catalog: specifies either Snowflake or a catalog integration to use for this table
* base_location: the base directory that snowflake can write iceberg metadata and files to
* catalog_sync: optionally sets the catalog integration configured for Polaris Catalog
* storage_serialization_policy: specifies the storage serialization policy for the table
Examples::
Expand Down Expand Up @@ -256,6 +269,7 @@ def save_as_table(
max_data_extension_time,
change_tracking,
copy_grants,
iceberg_config,
)
session = self._dataframe._session
snowflake_plan = session._analyzer.resolve(create_table_logic_plan)
Expand Down
32 changes: 31 additions & 1 deletion tests/integ/scala/test_dataframe_copy_into.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
StructField,
StructType,
)
from tests.utils import IS_IN_STORED_PROC, TestFiles, Utils
from tests.utils import IS_IN_STORED_PROC, TestFiles, Utils, iceberg_supported

test_file_csv = "testCSV.csv"
test_file2_csv = "test2CSV.csv"
Expand Down Expand Up @@ -245,6 +245,36 @@ def test_copy_csv_basic(session, tmp_stage_name1, tmp_table_name):
)


def test_copy_into_csv_iceberg(
session, tmp_stage_name1, tmp_table_name, local_testing_mode
):
if not iceberg_supported(session, local_testing_mode):
pytest.skip("Test requires iceberg support.")
test_file_on_stage = f"@{tmp_stage_name1}/{test_file_csv}"
df = session.read.schema(user_schema).csv(test_file_on_stage)

test_table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
df.copy_into_table(
test_table_name,
iceberg_config={
"external_volume": "PYTHON_CONNECTOR_ICEBERG_EXVOL",
"CATALOG": "SNOWFLAKE",
"BASE_LOCATION": "snowpark_python_tests",
},
)
try:
# Check that table is an iceberg table with correct properties set
ddl = session._run_query(f"select get_ddl('table', '{test_table_name}')")
assert (
ddl[0][0]
== f"create or replace ICEBERG TABLE {test_table_name} (\n\tA LONG,\n\tB STRING,\n\tC DOUBLE\n)\n EXTERNAL_VOLUME = 'PYTHON_CONNECTOR_ICEBERG_EXVOL'\n CATALOG = 'SNOWFLAKE'\n BASE_LOCATION = 'snowpark_python_tests/';"
)
# Check that a copy_into works on the newly created table.
df.copy_into_table(test_table_name)
finally:
Utils.drop_table(session, test_table_name)


def test_copy_csv_create_table_if_not_exists(session, tmp_stage_name1):
test_file_on_stage = f"@{tmp_stage_name1}/{test_file_csv}"
df = session.read.schema(user_schema).csv(test_file_on_stage)
Expand Down
Loading

0 comments on commit dd7b0c9

Please sign in to comment.