Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1359484 Allow write Snowpark pandas DataFrame and Series in sess… #1563

Merged
merged 8 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

### Improvements
- Improved performance of `pd.qcut` by removing joins in generated sql query.
- Allow `session.write_pandas` to write Snowpark pandas DataFrame or Series to a table.

## 1.15.0a1 (2024-07-05)

Expand Down
133 changes: 115 additions & 18 deletions src/snowflake/snowpark/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2112,9 +2112,85 @@ def get_session_stage(
self._stage_created = True
return f"{STAGE_PREFIX}{stage_name}"

def _write_modin_pandas_helper(
self,
df: Union[
"snowflake.snowpark.modin.pandas.DataFrame", # noqa: F821
"snowflake.snowpark.modin.pandas.Series", # noqa: F821
],
table_name: str,
location: str,
database: Optional[str] = None,
schema: Optional[str] = None,
quote_identifiers: bool = True,
auto_create_table: bool = False,
overwrite: bool = False,
index: bool = False,
index_label: Optional["IndexLabel"] = None, # noqa: F821
table_type: Literal["", "temp", "temporary", "transient"] = "",
) -> None:
"""A helper method used by `write_pandas` to write Snowpark pandas DataFrame or Series to a table by using
:func:`snowflake.snowpark.modin.pandas.DataFrame.to_snowflake <snowflake.snowpark.modin.pandas.DataFrame.to_snowflake>` or
:func:`snowflake.snowpark.modin.pandas.Series.to_snowflake <snowflake.snowpark.modin.pandas.Series.to_snowflake>` internally

Args:
df: The Snowpark pandas DataFrame or Series we'd like to write back.
table_name: Name of the table we want to insert into.
location: the location of the table in string.
database: Database that the table is in. If not provided, the default one will be used.
schema: Schema that the table is in. If not provided, the default one will be used.
quote_identifiers: By default, identifiers, specifically database, schema, table and column names
(from :attr:`DataFrame.columns`) will be quoted. If set to ``False``, identifiers
are passed on to Snowflake without quoting, i.e. identifiers will be coerced to uppercase by Snowflake.
auto_create_table: When true, automatically creates a table to store the passed in pandas DataFrame using the
passed in ``database``, ``schema``, and ``table_name``. Note: there are usually multiple table configurations that
would allow you to upload a particular pandas DataFrame successfully. If you don't like the auto created
table, you can always create your own table before calling this function. For example, auto-created
tables will store :class:`list`, :class:`tuple` and :class:`dict` as strings in a VARCHAR column.
overwrite: Default value is ``False`` and the pandas DataFrame data is appended to the existing table. If set to ``True`` and if auto_create_table is also set to ``True``,
then it drops the table. If set to ``True`` and if auto_create_table is set to ``False``,
then it truncates the table. Note that in both cases (when overwrite is set to ``True``) it will replace the existing
contents of the table with that of the passed in pandas DataFrame.
index: default True
If true, save DataFrame index columns as table columns.
index_label:
Column label for index column(s). If None is given (default) and index is True,
then the index names are used. A sequence should be given if the DataFrame uses MultiIndex.
table_type: The table type of table to be created. The supported values are: ``temp``, ``temporary``,
and ``transient``. An empty string means to create a permanent table. Learn more about table types
`here <https://docs.snowflake.com/en/user-guide/tables-temp-transient.html>`_.
"""

def quote_id(id: str) -> str:
return '"' + id + '"' if quote_identifiers else id

name = [table_name]
if schema:
sfc-gh-azhan marked this conversation as resolved.
Show resolved Hide resolved
name = [quote_id(schema)] + name
if database:
name = [quote_id(database)] + name

if not auto_create_table and not self._table_exists(name):
raise SnowparkClientException(
f"Cannot write Snowpark pandas DataFrame or Series to table {location} because it does not exist. Use "
f"auto_create_table = True to create table before writing a Snowpark pandas DataFrame or Series"
)
if_exists = "replace" if overwrite else "append"
df.to_snowflake(
name=name,
if_exists=if_exists,
index=index,
index_label=index_label,
table_type=table_type,
)

def write_pandas(
self,
df: "pandas.DataFrame",
df: Union[
"pandas.DataFrame",
"snowflake.snowpark.modin.pandas.DataFrame", # noqa: F821
"snowflake.snowpark.modin.pandas.Series", # noqa: F821
],
table_name: str,
*,
database: Optional[str] = None,
Expand All @@ -2135,7 +2211,7 @@ def write_pandas(
pandas DataFrame was written to.

Args:
df: The pandas DataFrame we'd like to write back.
df: The pandas DataFrame or Snowpark pandas DataFrame or Series we'd like to write back.
table_name: Name of the table we want to insert into.
database: Database that the table is in. If not provided, the default one will be used.
schema: Schema that the table is in. If not provided, the default one will be used.
Expand Down Expand Up @@ -2200,6 +2276,12 @@ def write_pandas(
Snowflake that the passed in pandas DataFrame can be written to. If
your pandas DataFrame cannot be written to the specified table, an
exception will be raised.

If the dataframe is Snowpark pandas :class:`~snowflake.snowpark.modin.pandas.DataFrame`
or :class:`~snowflake.snowpark.modin.pandas.Series`, it will call
:func:`modin.pandas.DataFrame.to_snowflake <snowflake.snowpark.modin.pandas.DataFrame.to_snowflake>`
or :func:`modin.pandas.Series.to_snowflake <snowflake.snowpark.modin.pandas.Series.to_snowflake>`
internally to write a Snowpark pandas DataFrame into a Snowflake table.
"""
if create_temp_table:
warning(
Expand Down Expand Up @@ -2241,22 +2323,37 @@ def write_pandas(
"snowflake-connector-python to 3.4.0 or above.",
stacklevel=1,
)
success, nchunks, nrows, ci_output = write_pandas(
self._conn._conn,
df,
table_name,
database=database,
schema=schema,
chunk_size=chunk_size,
compression=compression,
on_error=on_error,
parallel=parallel,
quote_identifiers=quote_identifiers,
auto_create_table=auto_create_table,
overwrite=overwrite,
table_type=table_type,
**kwargs,
)
if "modin" in str(type(df)):
sfc-gh-azhan marked this conversation as resolved.
Show resolved Hide resolved
self._write_modin_pandas_helper(
df,
table_name,
location,
database=database,
schema=schema,
quote_identifiers=quote_identifiers,
auto_create_table=auto_create_table,
overwrite=overwrite,
table_type=table_type,
**kwargs,
)
success, ci_output = True, ""
else:
success, _, _, ci_output = write_pandas(
self._conn._conn,
df,
table_name,
database=database,
schema=schema,
chunk_size=chunk_size,
compression=compression,
on_error=on_error,
parallel=parallel,
quote_identifiers=quote_identifiers,
auto_create_table=auto_create_table,
overwrite=overwrite,
table_type=table_type,
**kwargs,
)
except ProgrammingError as pe:
if pe.msg.endswith("does not exist"):
raise SnowparkClientExceptionMessages.DF_PANDAS_TABLE_DOES_NOT_EXIST_EXCEPTION(
Expand Down
Loading
Loading