Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1058583: Add support for iceberg tables to create_table and copy_into_table statements. #2162

Merged
merged 27 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0909e3b
SNOW-1058583: Add support for iceberg tables to create_table statements.
sfc-gh-jrose Aug 23, 2024
f298041
CHANGELOG
sfc-gh-jrose Aug 23, 2024
dc96973
Merge branch 'main' into jrose_snow_1058583_iceberg_save
sfc-gh-jrose Aug 23, 2024
71d4483
Add copy_into_table logic as well as additional tests.
sfc-gh-jrose Aug 27, 2024
600212a
Merge remote-tracking branch 'refs/remotes/origin/jrose_snow_1058583_…
sfc-gh-jrose Aug 27, 2024
5530253
Merge branch 'main' into jrose_snow_1058583_iceberg_save
sfc-gh-jrose Aug 27, 2024
2c4fb4d
add local testing skips
sfc-gh-jrose Aug 28, 2024
8a8c90e
Merge branch 'main' into jrose_snow_1058583_iceberg_save
sfc-gh-jrose Aug 28, 2024
00d79ff
Merge branch 'main' into jrose_snow_1058583_iceberg_save
sfc-gh-jrose Aug 28, 2024
d14b4fd
Review feedback part 1. Not ready for rereview
sfc-gh-jrose Aug 28, 2024
10c10f0
remove structured test. fix error
sfc-gh-jrose Aug 29, 2024
2b77553
update docs to make sphinx happy
sfc-gh-jrose Aug 29, 2024
b4c9604
debug
sfc-gh-jrose Aug 29, 2024
398828c
Merge branch 'main' into jrose_snow_1058583_iceberg_save
sfc-gh-jrose Aug 29, 2024
6e612b5
tests
sfc-gh-jrose Aug 29, 2024
5fa535b
Add structured save_as_table_test
sfc-gh-jrose Aug 29, 2024
9cbf4c4
further debug
sfc-gh-jrose Aug 29, 2024
3198e30
debug++
sfc-gh-jrose Aug 29, 2024
d7d76e5
tests fixed
sfc-gh-jrose Aug 29, 2024
d7635af
Merge branch 'main' into jrose_snow_1058583_iceberg_save
sfc-gh-jrose Aug 29, 2024
ffb9876
Review Feedback
sfc-gh-jrose Aug 29, 2024
001fb1d
Merge branch 'main' into jrose_snow_1058583_iceberg_save
sfc-gh-jrose Sep 3, 2024
ac0c866
Review Feedback
sfc-gh-jrose Sep 3, 2024
82c414d
pyright
sfc-gh-jrose Sep 3, 2024
b7c68c4
update logic fix remaining typos
sfc-gh-jrose Sep 3, 2024
08e36ea
Merge branch 'main' into jrose_snow_1058583_iceberg_save
sfc-gh-jrose Sep 3, 2024
f3a4282
Merge branch 'main' into jrose_snow_1058583_iceberg_save
sfc-gh-jrose Sep 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@
- `max_data_extension_time`
- `change_tracking`
- `copy_grants`
- `iceberg_config` A dicitionary that can hold the following iceberg configuration options:
- `external_volume`
- `catalog`
- `base_location`
- `catalog_sync`
- `storage_serialization_policy`
- Added support for specifying the following to `DataFrameWriter.copy_into_table`:
- `iceberg_config` A dicitionary that can hold the following iceberg configuration options:
- `external_volume`
- `catalog`
- `base_location`
- `catalog_sync`
- `storage_serialization_policy`
- Added support for specifying the following parameters to `DataFrame.create_or_replace_dynamic_table`:
- `mode`
- `refresh_mode`
Expand Down
2 changes: 2 additions & 0 deletions src/snowflake/snowpark/_internal/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,7 @@ def do_resolve_with_resolved_children(
use_scoped_temp_objects=self.session._use_scoped_temp_objects,
creation_source=logical_plan.creation_source,
child_attributes=resolved_child.attributes,
iceberg_config=logical_plan.iceberg_config,
)

if isinstance(logical_plan, Limit):
Expand Down Expand Up @@ -1184,6 +1185,7 @@ def do_resolve_with_resolved_children(
else None,
user_schema=logical_plan.user_schema,
create_table_from_infer_schema=logical_plan.create_table_from_infer_schema,
iceberg_config=logical_plan.iceberg_config,
)

if isinstance(logical_plan, CopyIntoLocationNode):
Expand Down
68 changes: 50 additions & 18 deletions src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@
DATA_RETENTION_TIME_IN_DAYS = " DATA_RETENTION_TIME_IN_DAYS "
MAX_DATA_EXTENSION_TIME_IN_DAYS = " MAX_DATA_EXTENSION_TIME_IN_DAYS "
CHANGE_TRACKING = " CHANGE_TRACKING "
EXTERNAL_VOLUME = " EXTERNAL_VOLUME "
CATALOG = " CATALOG "
BASE_LOCATION = " BASE_LOCATION "
CATALOG_SYNC = " CATALOG_SYNC "
STORAGE_SERIALIZATION_POLICY = " STORAGE_SERIALIZATION_POLICY "
REG_EXP = " REGEXP "
COLLATE = " COLLATE "
RESULT_SCAN = " RESULT_SCAN"
Expand Down Expand Up @@ -178,10 +183,30 @@
WITH = "WITH "
DEFAULT_ON_NULL = " DEFAULT ON NULL "
ANY = " ANY "
ICEBERG = " ICEBERG "

TEMPORARY_STRING_SET = frozenset(["temporary", "temp"])


def validate_iceberg_config(iceberg_config: Optional[dict]) -> Dict[str, str]:
if iceberg_config is None:
return dict()

iceberg_config = {k.lower(): v for k, v in iceberg_config.items()}
if "base_location" not in iceberg_config:
raise ValueError("Iceberg table configuration requires base_location be set.")

return {
EXTERNAL_VOLUME: iceberg_config.get("external_volume", None),
CATALOG: iceberg_config.get("catalog", None),
BASE_LOCATION: iceberg_config.get("base_location", None),
CATALOG_SYNC: iceberg_config.get("catalog_sync", None),
STORAGE_SERIALIZATION_POLICY: iceberg_config.get(
"storage_serialization_policy", None
),
}


def result_scan_statement(uuid_place_holder: str) -> str:
return (
SELECT
Expand Down Expand Up @@ -778,25 +803,29 @@ def create_table_statement(
*,
use_scoped_temp_objects: bool = False,
is_generated: bool = False,
iceberg_config: Optional[dict] = None,
) -> str:
cluster_by_clause = (
(CLUSTER_BY + LEFT_PARENTHESIS + COMMA.join(clustering_key) + RIGHT_PARENTHESIS)
if clustering_key
else EMPTY_STRING
)
comment_sql = get_comment_sql(comment)
options_statement = get_options_statement(
{
ENABLE_SCHEMA_EVOLUTION: enable_schema_evolution,
DATA_RETENTION_TIME_IN_DAYS: data_retention_time,
MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time,
CHANGE_TRACKING: change_tracking,
}
)
options = {
ENABLE_SCHEMA_EVOLUTION: enable_schema_evolution,
DATA_RETENTION_TIME_IN_DAYS: data_retention_time,
MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time,
CHANGE_TRACKING: change_tracking,
}

iceberg_config = validate_iceberg_config(iceberg_config)
options.update(iceberg_config)
options_statement = get_options_statement(options)

return (
f"{CREATE}{(OR + REPLACE) if replace else EMPTY_STRING}"
f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} "
f"{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}"
f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}{table_name}{(IF + NOT + EXISTS) if not replace and not error else EMPTY_STRING}"
f"{LEFT_PARENTHESIS}{schema}{RIGHT_PARENTHESIS}{cluster_by_clause}"
f"{options_statement}{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql}"
)
Expand Down Expand Up @@ -856,6 +885,7 @@ def create_table_as_select_statement(
max_data_extension_time: Optional[int] = None,
change_tracking: Optional[bool] = None,
copy_grants: bool = False,
iceberg_config: Optional[dict] = None,
*,
use_scoped_temp_objects: bool = False,
is_generated: bool = False,
Expand All @@ -871,18 +901,20 @@ def create_table_as_select_statement(
else EMPTY_STRING
)
comment_sql = get_comment_sql(comment)
options_statement = get_options_statement(
{
ENABLE_SCHEMA_EVOLUTION: enable_schema_evolution,
DATA_RETENTION_TIME_IN_DAYS: data_retention_time,
MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time,
CHANGE_TRACKING: change_tracking,
}
)
options = {
ENABLE_SCHEMA_EVOLUTION: enable_schema_evolution,
DATA_RETENTION_TIME_IN_DAYS: data_retention_time,
MAX_DATA_EXTENSION_TIME_IN_DAYS: max_data_extension_time,
CHANGE_TRACKING: change_tracking,
}
iceberg_config = validate_iceberg_config(iceberg_config)
options.update(iceberg_config)
options_statement = get_options_statement(options)
return (
f"{CREATE}{OR + REPLACE if replace else EMPTY_STRING}"
f" {(get_temp_type_for_object(use_scoped_temp_objects, is_generated) if table_type.lower() in TEMPORARY_STRING_SET else table_type).upper()} "
f"{TABLE}{IF + NOT + EXISTS if not replace and not error else EMPTY_STRING} "
f"{ICEBERG if iceberg_config else EMPTY_STRING}{TABLE}"
f"{IF + NOT + EXISTS if not replace and not error else EMPTY_STRING} "
f"{table_name}{column_definition_sql}{cluster_by_clause}{options_statement}"
f"{COPY_GRANTS if copy_grants else EMPTY_STRING}{comment_sql} {AS}{project_statement([], child)}"
)
Expand Down
12 changes: 12 additions & 0 deletions src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,7 @@ def save_as_table(
use_scoped_temp_objects: bool,
creation_source: TableCreationSource,
child_attributes: Optional[List[Attribute]],
iceberg_config: Optional[dict] = None,
) -> SnowflakePlan:
"""Returns a SnowflakePlan to materialize the child plan into a table.

Expand All @@ -872,6 +873,13 @@ def save_as_table(
child_attributes: child attributes will be none in the case of large query breakdown
where we use ctas query to create the table which does not need to know the column
metadata.
iceberg_config: A dictionary that can contain the following iceberg configuration values:
external_volume: specifies the identifier for the external volume where
the Iceberg table stores its metadata files and data in Parquet format
catalog: specifies either Snowflake or a catalog integration to use for this table
base_location: the base directory that snowflake can write iceberg metadata and files to
catalog_sync: optionally sets the catalog integration configured for Polaris Catalog
storage_serialization_policy: specifies the storage serialization policy for the table
"""
is_generated = creation_source in (
TableCreationSource.CACHE_RESULT,
Expand Down Expand Up @@ -929,6 +937,7 @@ def get_create_table_as_select_plan(child: SnowflakePlan, replace, error):
max_data_extension_time=max_data_extension_time,
change_tracking=change_tracking,
copy_grants=copy_grants,
iceberg_config=iceberg_config,
use_scoped_temp_objects=use_scoped_temp_objects,
is_generated=is_generated,
),
Expand Down Expand Up @@ -957,6 +966,7 @@ def get_create_and_insert_plan(child: SnowflakePlan, replace, error):
copy_grants=copy_grants,
use_scoped_temp_objects=use_scoped_temp_objects,
is_generated=is_generated,
iceberg_config=iceberg_config,
)

# so that dataframes created from non-select statements,
Expand Down Expand Up @@ -1380,6 +1390,7 @@ def copy_into_table(
*,
copy_options: Dict[str, Any],
format_type_options: Dict[str, Any],
iceberg_config: Optional[dict] = None,
) -> SnowflakePlan:
# tracking usage of pattern, will refactor this function in future
if pattern:
Expand Down Expand Up @@ -1413,6 +1424,7 @@ def copy_into_table(
create_table_statement(
full_table_name,
attribute_to_schema_string(attributes),
iceberg_config=iceberg_config,
),
# This is an exception. The principle is to avoid surprising behavior and most of the time
# it applies to temp object. But this perm table creation is also one place where we create
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def __init__(
max_data_extension_time: Optional[int] = None,
change_tracking: Optional[bool] = None,
copy_grants: bool = False,
iceberg_config: Optional[dict] = None,
) -> None:
super().__init__()

Expand All @@ -217,6 +218,7 @@ def __init__(
self.max_data_extension_time = max_data_extension_time
self.change_tracking = change_tracking
self.copy_grants = copy_grants
self.iceberg_config = iceberg_config

@property
def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]:
Expand Down Expand Up @@ -277,6 +279,7 @@ def __init__(
user_schema: Optional[StructType] = None,
cur_options: Optional[Dict[str, Any]] = None, # the options of DataFrameReader
create_table_from_infer_schema: bool = False,
iceberg_config: Optional[dict] = None,
) -> None:
super().__init__()
self.table_name = table_name
Expand All @@ -292,6 +295,7 @@ def __init__(
self.user_schema = user_schema
self.cur_options = cur_options
self.create_table_from_infer_schema = create_table_from_infer_schema
self.iceberg_config = iceberg_config


class CopyIntoLocationNode(LogicalPlan):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def do_resolve_with_resolved_children(
use_scoped_temp_objects=self.session._use_scoped_temp_objects,
creation_source=logical_plan.creation_source,
child_attributes=child_attributes,
iceberg_config=logical_plan.iceberg_config,
)

elif isinstance(
Expand Down
16 changes: 16 additions & 0 deletions src/snowflake/snowpark/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3026,6 +3026,7 @@ def copy_into_table(
transformations: Optional[Iterable[ColumnOrName]] = None,
format_type_options: Optional[Dict[str, Any]] = None,
statement_params: Optional[Dict[str, str]] = None,
iceberg_config: Optional[dict] = None,
**copy_options: Any,
) -> List[Row]:
"""Executes a `COPY INTO <table> <https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html>`__ command to load data from files in a stage location into a specified table.
Expand Down Expand Up @@ -3079,6 +3080,19 @@ def copy_into_table(
transformations: A list of column transformations.
format_type_options: A dict that contains the ``formatTypeOptions`` of the ``COPY INTO <table>`` command.
statement_params: Dictionary of statement level parameters to be set while executing this action.
iceberg_config: A dictionary that can contain the following iceberg configuration values:

* external_volume: specifies the identifier for the external volume where
the Iceberg table stores its metadata files and data in Parquet format

* catalog: specifies either Snowflake or a catalog integration to use for this table

* base_location: the base directory that snowflake can write iceberg metadata and files to

* catalog_sync: optionally sets the catalog integration configured for Polaris Catalog

* storage_serialization_policy: specifies the storage serialization policy for the table

copy_options: The kwargs that is used to specify the ``copyOptions`` of the ``COPY INTO <table>`` command.
"""
if not self._reader or not self._reader._file_path:
Expand Down Expand Up @@ -3144,6 +3158,7 @@ def copy_into_table(
if transformations
else None
)

return DataFrame(
self._session,
CopyIntoTableNode(
Expand All @@ -3160,6 +3175,7 @@ def copy_into_table(
user_schema=self._reader._user_schema,
cur_options=self._reader._cur_options,
create_table_from_infer_schema=create_table_from_infer_schema,
iceberg_config=iceberg_config,
),
)._internal_collect_with_tag_no_telemetry(statement_params=statement_params)

Expand Down
14 changes: 14 additions & 0 deletions src/snowflake/snowpark/dataframe_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def save_as_table(
max_data_extension_time: Optional[int] = None,
change_tracking: Optional[bool] = None,
copy_grants: bool = False,
iceberg_config: Optional[dict] = None,
) -> Optional[AsyncJob]:
"""Writes the data to the specified table in a Snowflake database.

Expand Down Expand Up @@ -186,6 +187,18 @@ def save_as_table(
block: A bool value indicating whether this function will wait until the result is available.
When it is ``False``, this function executes the underlying queries of the dataframe
asynchronously and returns an :class:`AsyncJob`.
iceberg_config: A dictionary that can contain the following iceberg configuration values:

* external_volume: specifies the identifier for the external volume where
the Iceberg table stores its metadata files and data in Parquet format

* catalog: specifies either Snowflake or a catalog integration to use for this table

* base_location: the base directory that snowflake can write iceberg metadata and files to

* catalog_sync: optionally sets the catalog integration configured for Polaris Catalog

* storage_serialization_policy: specifies the storage serialization policy for the table

Examples::

Expand Down Expand Up @@ -256,6 +269,7 @@ def save_as_table(
max_data_extension_time,
change_tracking,
copy_grants,
iceberg_config,
)
session = self._dataframe._session
snowflake_plan = session._analyzer.resolve(create_table_logic_plan)
Expand Down
32 changes: 31 additions & 1 deletion tests/integ/scala/test_dataframe_copy_into.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
StructField,
StructType,
)
from tests.utils import IS_IN_STORED_PROC, TestFiles, Utils
from tests.utils import IS_IN_STORED_PROC, TestFiles, Utils, iceberg_supported

test_file_csv = "testCSV.csv"
test_file2_csv = "test2CSV.csv"
Expand Down Expand Up @@ -245,6 +245,36 @@ def test_copy_csv_basic(session, tmp_stage_name1, tmp_table_name):
)


def test_copy_into_csv_iceberg(
session, tmp_stage_name1, tmp_table_name, local_testing_mode
):
if not iceberg_supported(session, local_testing_mode):
pytest.skip("Test requires iceberg support.")
test_file_on_stage = f"@{tmp_stage_name1}/{test_file_csv}"
df = session.read.schema(user_schema).csv(test_file_on_stage)

test_table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
df.copy_into_table(
test_table_name,
iceberg_config={
"external_volume": "PYTHON_CONNECTOR_ICEBERG_EXVOL",
"CATALOG": "SNOWFLAKE",
"BASE_LOCATION": "snowpark_python_tests",
},
)
try:
# Check that table is an iceberg table with correct properties set
ddl = session._run_query(f"select get_ddl('table', '{test_table_name}')")
assert (
ddl[0][0]
== f"create or replace ICEBERG TABLE {test_table_name} (\n\tA LONG,\n\tB STRING,\n\tC DOUBLE\n)\n EXTERNAL_VOLUME = 'PYTHON_CONNECTOR_ICEBERG_EXVOL'\n CATALOG = 'SNOWFLAKE'\n BASE_LOCATION = 'snowpark_python_tests/';"
)
# Check that a copy_into works on the newly created table.
df.copy_into_table(test_table_name)
finally:
Utils.drop_table(session, test_table_name)


def test_copy_csv_create_table_if_not_exists(session, tmp_stage_name1):
test_file_on_stage = f"@{tmp_stage_name1}/{test_file_csv}"
df = session.read.schema(user_schema).csv(test_file_on_stage)
Expand Down
Loading
Loading