From 2af8525aded32f82b8d7a36a62efdb4e58ffe603 Mon Sep 17 00:00:00 2001 From: azhan Date: Fri, 10 May 2024 15:12:49 -0700 Subject: [PATCH 1/8] SNOW-1359484 Allow write Snowpark pandas DataFrame and Series in session.write_pandas --- .../snowpark/modin/plugin/PANDAS_CHANGELOG.md | 3 + src/snowflake/snowpark/session.py | 131 ++++++++-- .../integ/modin/test_session_write_pandas.py | 228 ++++++++++++++++++ 3 files changed, 344 insertions(+), 18 deletions(-) create mode 100644 tests/integ/modin/test_session_write_pandas.py diff --git a/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md b/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md index 91f0371eb80..024da40f56d 100644 --- a/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md +++ b/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md @@ -8,6 +8,9 @@ ### Improvements - Improved performance of `pd.qcut` by removing joins in generated sql query. +### Improvements +- Allow `session.write_pandas` to write Snowpark pandas DataFrame or Series to a table. + ## 1.15.0a1 (2024-07-05) ### Bug Fixes diff --git a/src/snowflake/snowpark/session.py b/src/snowflake/snowpark/session.py index 0eb1839f670..5862563ee03 100644 --- a/src/snowflake/snowpark/session.py +++ b/src/snowflake/snowpark/session.py @@ -2112,9 +2112,84 @@ def get_session_stage( self._stage_created = True return f"{STAGE_PREFIX}{stage_name}" + def _write_modin_pandas_helper( + self, + df: Union[ + "snowflake.snowpark.modin.pandas.DataFrame", # noqa: F821 + "snowflake.snowpark.modin.pandas.Series", # noqa: F821 + ], + table_name: str, + location: str, + database: Optional[str] = None, + schema: Optional[str] = None, + quote_identifiers: bool = True, + auto_create_table: bool = False, + overwrite: bool = False, + index: bool = False, + index_label: Optional["IndexLabel"] = None, # noqa: F821 + table_type: Literal["", "temp", "temporary", "transient"] = "", + ) -> None: + """A helper method used by `write_pandas` to write Snowpark pandas DataFrame or Series to a table by using + :func:`snowflake.snowpark.modin.pandas.DataFrame.to_snowflake ` or + :func:`snowflake.snowpark.modin.pandas.Series.to_snowflake ` internally + + Args: + df: The Snowpark pandas DataFrame or Series we'd like to write back. + table_name: Name of the table we want to insert into. + location: the location of the table in string. + database: Database that the table is in. If not provided, the default one will be used. + schema: Schema that the table is in. If not provided, the default one will be used. + quote_identifiers: By default, identifiers, specifically database, schema, table and column names + (from :attr:`DataFrame.columns`) will be quoted. If set to ``False``, identifiers + are passed on to Snowflake without quoting, i.e. identifiers will be coerced to uppercase by Snowflake. + auto_create_table: When true, automatically creates a table to store the passed in pandas DataFrame using the + passed in ``database``, ``schema``, and ``table_name``. Note: there are usually multiple table configurations that + would allow you to upload a particular pandas DataFrame successfully. If you don't like the auto created + table, you can always create your own table before calling this function. For example, auto-created + tables will store :class:`list`, :class:`tuple` and :class:`dict` as strings in a VARCHAR column. + overwrite: Default value is ``False`` and the pandas DataFrame data is appended to the existing table. If set to ``True`` and if auto_create_table is also set to ``True``, + then it drops the table. If set to ``True`` and if auto_create_table is set to ``False``, + then it truncates the table. Note that in both cases (when overwrite is set to ``True``) it will replace the existing + contents of the table with that of the passed in pandas DataFrame. + index: default True + If true, save DataFrame index columns as table columns. + index_label: + Column label for index column(s). If None is given (default) and index is True, + then the index names are used. A sequence should be given if the DataFrame uses MultiIndex. + table_type: The table type of table to be created. The supported values are: ``temp``, ``temporary``, + and ``transient``. An empty string means to create a permanent table. Learn more about table types + `here `_. + """ + + def quote_id(id: str) -> str: + return '"' + id + '"' if quote_identifiers else id + + name = [table_name] + if schema: + name = [quote_id(schema)] + name + if database: + name = [quote_id(database)] + name + + if not auto_create_table and not self._table_exists(name): + raise SnowparkClientException( + f"Cannot write Snowpark pandas DataFrame to table {location} because it does not exist. Use auto_create_table = True to create table before writing a Snowpark pandas DataFrame" + ) + if_exists = "replace" if overwrite else "append" + df.to_snowflake( + name=name, + if_exists=if_exists, + index=index, + index_label=index_label, + table_type=table_type, + ) + def write_pandas( self, - df: "pandas.DataFrame", + df: Union[ + "pandas.DataFrame", + "snowflake.snowpark.modin.pandas.DataFrame", # noqa: F821 + "snowflake.snowpark.modin.pandas.Series", # noqa: F821 + ], table_name: str, *, database: Optional[str] = None, @@ -2135,7 +2210,7 @@ def write_pandas( pandas DataFrame was written to. Args: - df: The pandas DataFrame we'd like to write back. + df: The pandas DataFrame or Snowpark pandas DataFrame or Series we'd like to write back. table_name: Name of the table we want to insert into. database: Database that the table is in. If not provided, the default one will be used. schema: Schema that the table is in. If not provided, the default one will be used. @@ -2200,6 +2275,11 @@ def write_pandas( Snowflake that the passed in pandas DataFrame can be written to. If your pandas DataFrame cannot be written to the specified table, an exception will be raised. + + If the dataframe is :class:`~snowflake.snowpark.modin.pandas.DataFrame` or :class:`~snowflake.snowpark.modin.pandas.Series`, + it will call :func:`snowflake.snowpark.modin.pandas.DataFrame.to_snowflake ` or + :func:`snowflake.snowpark.modin.pandas.Series.to_snowflake ` internally + to write a Snowpark pandas DataFrame into a Snowflake table. """ if create_temp_table: warning( @@ -2241,22 +2321,37 @@ def write_pandas( "snowflake-connector-python to 3.4.0 or above.", stacklevel=1, ) - success, nchunks, nrows, ci_output = write_pandas( - self._conn._conn, - df, - table_name, - database=database, - schema=schema, - chunk_size=chunk_size, - compression=compression, - on_error=on_error, - parallel=parallel, - quote_identifiers=quote_identifiers, - auto_create_table=auto_create_table, - overwrite=overwrite, - table_type=table_type, - **kwargs, - ) + if "modin" in str(type(df)): + self._write_modin_pandas_helper( + df, + table_name, + location, + database=database, + schema=schema, + quote_identifiers=quote_identifiers, + auto_create_table=auto_create_table, + overwrite=overwrite, + table_type=table_type, + **kwargs, + ) + success, ci_output = True, "" + else: + success, _, _, ci_output = write_pandas( + self._conn._conn, + df, + table_name, + database=database, + schema=schema, + chunk_size=chunk_size, + compression=compression, + on_error=on_error, + parallel=parallel, + quote_identifiers=quote_identifiers, + auto_create_table=auto_create_table, + overwrite=overwrite, + table_type=table_type, + **kwargs, + ) except ProgrammingError as pe: if pe.msg.endswith("does not exist"): raise SnowparkClientExceptionMessages.DF_PANDAS_TABLE_DOES_NOT_EXIST_EXCEPTION( diff --git a/tests/integ/modin/test_session_write_pandas.py b/tests/integ/modin/test_session_write_pandas.py new file mode 100644 index 00000000000..57774ad6455 --- /dev/null +++ b/tests/integ/modin/test_session_write_pandas.py @@ -0,0 +1,228 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# + +import modin.pandas as pd +import pytest + +from snowflake.snowpark import Row +from snowflake.snowpark._internal.utils import ( + TempObjectType, + is_in_stored_procedure, + random_name_for_temp_object, +) +from snowflake.snowpark.exceptions import SnowparkClientException, SnowparkSQLException +from snowflake.snowpark.types import ( + FloatType, + IntegerType, + StringType, + StructField, + StructType, +) +from tests.integ.modin.utils import assert_frame_equal +from tests.utils import Utils + + +@pytest.mark.parametrize("quote_identifiers", [True, False]) +@pytest.mark.parametrize("auto_create_table", [True, False]) +@pytest.mark.parametrize("overwrite", [True, False]) +def test_write_pandas_with_overwrite( + session, + quote_identifiers: bool, + auto_create_table: bool, + overwrite: bool, +): + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + try: + pd1 = pd.DataFrame( + [ + (1, 4.5, "Nike"), + (2, 7.5, "Adidas"), + (3, 10.5, "Puma"), + ], + columns=["id".upper(), "foot_size".upper(), "shoe_make".upper()], + ) + + pd2 = pd.DataFrame( + [(1, 8.0, "Dia Dora")], + columns=["id".upper(), "foot_size".upper(), "shoe_make".upper()], + ) + + pd3 = pd.DataFrame( + [(1, "dash", 1000, 32)], + columns=["id".upper(), "name".upper(), "points".upper(), "age".upper()], + ) + + # Create initial table and insert 3 rows + table1 = session.write_pandas( + pd1, table_name, quote_identifiers=quote_identifiers, auto_create_table=True + ) + + assert_frame_equal(pd1, table1.to_pandas(), check_dtype=False) + + # Insert 1 row + table2 = session.write_pandas( + pd2, + table_name, + quote_identifiers=quote_identifiers, + overwrite=overwrite, + auto_create_table=auto_create_table, + ) + results = table2.to_pandas() + if overwrite: + # Results should match pd2 + assert_frame_equal(results, pd2, check_dtype=False) + else: + # Results count should match pd1 + pd2 + assert results.shape[0] == 4 + + if overwrite: + # In this case, the table is first dropped and since there's a new schema, the results should now match pd3 + table3 = session.write_pandas( + pd3, + table_name, + quote_identifiers=quote_identifiers, + overwrite=overwrite, + auto_create_table=auto_create_table, + ) + results = table3.to_pandas() + assert_frame_equal(results, pd3, check_dtype=False) + else: + # In this case, the table is truncated but since there's a new schema, it should fail + with pytest.raises(SnowparkSQLException) as ex_info: + session.write_pandas( + pd3, + table_name, + quote_identifiers=quote_identifiers, + overwrite=overwrite, + auto_create_table=auto_create_table, + ) + assert "Insert value list does not match column list" in str(ex_info) + + with pytest.raises(SnowparkClientException) as ex_info: + session.write_pandas(pd1, "tmp_table") + assert ( + 'Cannot write Snowpark pandas DataFrame to table "tmp_table" because it does not exist. ' + "Use auto_create_table = True to create table before writing a Snowpark pandas DataFrame" + in str(ex_info) + ) + finally: + Utils.drop_table(session, table_name) + Utils.drop_table(session, "tmp_table") + + +@pytest.fixture(scope="module") +def tmp_table_basic(session): + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + df = session.create_dataframe( + data=[], + schema=StructType( + [ + StructField("id", IntegerType()), + StructField("foot_size", FloatType()), + StructField("shoe_model", StringType()), + ] + ), + ) + df.write.save_as_table(table_name) + try: + yield table_name + finally: + Utils.drop_table(session, table_name) + + +def test_write_pandas(session, tmp_table_basic): + df = pd.DataFrame( + [ + (1, 4.5, "t1"), + (2, 7.5, "t2"), + (3, 10.5, "t3"), + ], + columns=["id".upper(), "foot_size".upper(), "shoe_model".upper()], + ) + + table = session.write_pandas(df, tmp_table_basic, overwrite=True) + results = table.to_pandas() + assert_frame_equal(results, df, check_dtype=False) + + # Auto create a new table + session._run_query(f'drop table if exists "{tmp_table_basic}"') + table = session.write_pandas(df, tmp_table_basic, auto_create_table=True) + table_info = session.sql(f"show tables like '{tmp_table_basic}'").collect() + assert table_info[0]["kind"] == "TABLE" + results = table.to_pandas() + assert_frame_equal(results, df, check_dtype=False) + + nonexistent_table = Utils.random_name_for_temp_object(TempObjectType.TABLE) + with pytest.raises(SnowparkClientException) as ex_info: + session.write_pandas(df, nonexistent_table, auto_create_table=False) + assert ( + f'Cannot write Snowpark pandas DataFrame to table "{nonexistent_table}" because it does not exist. ' + "Use auto_create_table = True to create table before writing a Snowpark pandas DataFrame" + in str(ex_info) + ) + + # Drop tables that were created for this test + session._run_query(f'drop table if exists "{tmp_table_basic}"') + + +@pytest.mark.parametrize("table_type", ["", "temp", "temporary", "transient"]) +def test_write_pandas_with_table_type(session, table_type: str): + df = pd.DataFrame( + [ + (1, 4.5, "t1"), + (2, 7.5, "t2"), + (3, 10.5, "t3"), + ], + columns=["id".upper(), "foot_size".upper(), "shoe_model".upper()], + ) + + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + try: + table = session.write_pandas( + df, + table_name, + table_type=table_type, + auto_create_table=True, + ) + results = table.to_pandas() + assert_frame_equal(results, df, check_dtype=False) + Utils.assert_table_type(session, table_name, table_type) + finally: + Utils.drop_table(session, table_name) + + +def test_write_to_different_schema(session, local_testing_mode): + pd_df = pd.DataFrame( + [ + (1, 4.5, "Nike"), + (2, 7.5, "Adidas"), + (3, 10.5, "Puma"), + ], + columns=["id".upper(), "foot_size".upper(), "shoe_make".upper()], + ) + original_schema_name = session.get_current_schema() + test_schema_name = Utils.random_temp_schema() + + try: + if not local_testing_mode: + Utils.create_schema(session, test_schema_name) + # For owner's rights stored proc test, current schema does not change after creating a new schema + if not is_in_stored_procedure(): + session.use_schema(original_schema_name) + assert session.get_current_schema() == original_schema_name + table_name = random_name_for_temp_object(TempObjectType.TABLE) + session.write_pandas( + pd_df, + table_name, + quote_identifiers=False, + schema=test_schema_name, + auto_create_table=True, + ) + Utils.check_answer( + session.table(f"{test_schema_name}.{table_name}").sort("id"), + [Row(1, 4.5, "Nike"), Row(2, 7.5, "Adidas"), Row(3, 10.5, "Puma")], + ) + finally: + if not local_testing_mode: + Utils.drop_schema(session, test_schema_name) From b1f890d0cd170f14a7a2a36ea373ee5b50f7bb84 Mon Sep 17 00:00:00 2001 From: azhan Date: Mon, 13 May 2024 11:01:50 -0700 Subject: [PATCH 2/8] add sql counter --- .../integ/modin/test_session_write_pandas.py | 204 ++++++++++-------- 1 file changed, 109 insertions(+), 95 deletions(-) diff --git a/tests/integ/modin/test_session_write_pandas.py b/tests/integ/modin/test_session_write_pandas.py index 57774ad6455..1af4d26a686 100644 --- a/tests/integ/modin/test_session_write_pandas.py +++ b/tests/integ/modin/test_session_write_pandas.py @@ -19,6 +19,7 @@ StructField, StructType, ) +from tests.integ.modin.sql_counter import SqlCounter from tests.integ.modin.utils import assert_frame_equal from tests.utils import Utils @@ -53,59 +54,67 @@ def test_write_pandas_with_overwrite( columns=["id".upper(), "name".upper(), "points".upper(), "age".upper()], ) - # Create initial table and insert 3 rows - table1 = session.write_pandas( - pd1, table_name, quote_identifiers=quote_identifiers, auto_create_table=True - ) - - assert_frame_equal(pd1, table1.to_pandas(), check_dtype=False) + with SqlCounter(query_count=5): + # Create initial table and insert 3 rows + table1 = session.write_pandas( + pd1, + table_name, + quote_identifiers=quote_identifiers, + auto_create_table=True, + ) - # Insert 1 row - table2 = session.write_pandas( - pd2, - table_name, - quote_identifiers=quote_identifiers, - overwrite=overwrite, - auto_create_table=auto_create_table, - ) - results = table2.to_pandas() - if overwrite: - # Results should match pd2 - assert_frame_equal(results, pd2, check_dtype=False) - else: - # Results count should match pd1 + pd2 - assert results.shape[0] == 4 + assert_frame_equal(pd1, table1.to_pandas(), check_dtype=False) - if overwrite: - # In this case, the table is first dropped and since there's a new schema, the results should now match pd3 - table3 = session.write_pandas( - pd3, + with SqlCounter(query_count=3 if auto_create_table else 4): + # Insert 1 row + table2 = session.write_pandas( + pd2, table_name, quote_identifiers=quote_identifiers, overwrite=overwrite, auto_create_table=auto_create_table, ) - results = table3.to_pandas() - assert_frame_equal(results, pd3, check_dtype=False) - else: - # In this case, the table is truncated but since there's a new schema, it should fail - with pytest.raises(SnowparkSQLException) as ex_info: - session.write_pandas( + results = table2.to_pandas() + if overwrite: + # Results should match pd2 + assert_frame_equal(results, pd2, check_dtype=False) + else: + # Results count should match pd1 + pd2 + assert results.shape[0] == 4 + + if overwrite: + with SqlCounter(query_count=3 if auto_create_table else 4): + # In this case, the table is first dropped and since there's a new schema, the results should now match pd3 + table3 = session.write_pandas( pd3, table_name, quote_identifiers=quote_identifiers, overwrite=overwrite, auto_create_table=auto_create_table, ) - assert "Insert value list does not match column list" in str(ex_info) - - with pytest.raises(SnowparkClientException) as ex_info: - session.write_pandas(pd1, "tmp_table") - assert ( - 'Cannot write Snowpark pandas DataFrame to table "tmp_table" because it does not exist. ' - "Use auto_create_table = True to create table before writing a Snowpark pandas DataFrame" - in str(ex_info) - ) + results = table3.to_pandas() + assert_frame_equal(results, pd3, check_dtype=False) + else: + with SqlCounter(query_count=1 if auto_create_table else 2): + # In this case, the table is truncated but since there's a new schema, it should fail + with pytest.raises(SnowparkSQLException) as ex_info: + session.write_pandas( + pd3, + table_name, + quote_identifiers=quote_identifiers, + overwrite=overwrite, + auto_create_table=auto_create_table, + ) + assert "Insert value list does not match column list" in str(ex_info) + + with SqlCounter(query_count=1): + with pytest.raises(SnowparkClientException) as ex_info: + session.write_pandas(pd1, "tmp_table") + assert ( + 'Cannot write Snowpark pandas DataFrame to table "tmp_table" because it does not exist. ' + "Use auto_create_table = True to create table before writing a Snowpark pandas DataFrame" + in str(ex_info) + ) finally: Utils.drop_table(session, table_name) Utils.drop_table(session, "tmp_table") @@ -132,38 +141,43 @@ def tmp_table_basic(session): def test_write_pandas(session, tmp_table_basic): - df = pd.DataFrame( - [ - (1, 4.5, "t1"), - (2, 7.5, "t2"), - (3, 10.5, "t3"), - ], - columns=["id".upper(), "foot_size".upper(), "shoe_model".upper()], - ) - - table = session.write_pandas(df, tmp_table_basic, overwrite=True) - results = table.to_pandas() - assert_frame_equal(results, df, check_dtype=False) - - # Auto create a new table - session._run_query(f'drop table if exists "{tmp_table_basic}"') - table = session.write_pandas(df, tmp_table_basic, auto_create_table=True) - table_info = session.sql(f"show tables like '{tmp_table_basic}'").collect() - assert table_info[0]["kind"] == "TABLE" - results = table.to_pandas() - assert_frame_equal(results, df, check_dtype=False) - - nonexistent_table = Utils.random_name_for_temp_object(TempObjectType.TABLE) - with pytest.raises(SnowparkClientException) as ex_info: - session.write_pandas(df, nonexistent_table, auto_create_table=False) - assert ( - f'Cannot write Snowpark pandas DataFrame to table "{nonexistent_table}" because it does not exist. ' - "Use auto_create_table = True to create table before writing a Snowpark pandas DataFrame" - in str(ex_info) + try: + df = pd.DataFrame( + [ + (1, 4.5, "t1"), + (2, 7.5, "t2"), + (3, 10.5, "t3"), + ], + columns=["id".upper(), "foot_size".upper(), "shoe_model".upper()], ) - # Drop tables that were created for this test - session._run_query(f'drop table if exists "{tmp_table_basic}"') + with SqlCounter(query_count=4): + table = session.write_pandas(df, tmp_table_basic, overwrite=True) + results = table.to_pandas() + assert_frame_equal(results, df, check_dtype=False) + finally: + session._run_query(f'drop table if exists "{tmp_table_basic}"') + + try: + with SqlCounter(query_count=6): + table = session.write_pandas(df, tmp_table_basic, auto_create_table=True) + table_info = session.sql(f"show tables like '{tmp_table_basic}'").collect() + assert table_info[0]["kind"] == "TABLE" + results = table.to_pandas() + assert_frame_equal(results, df, check_dtype=False) + + nonexistent_table = Utils.random_name_for_temp_object(TempObjectType.TABLE) + with SqlCounter(query_count=1): + with pytest.raises(SnowparkClientException) as ex_info: + session.write_pandas(df, nonexistent_table, auto_create_table=False) + assert ( + f'Cannot write Snowpark pandas DataFrame to table "{nonexistent_table}" because it does not exist. ' + "Use auto_create_table = True to create table before writing a Snowpark pandas DataFrame" + in str(ex_info) + ) + finally: + # Drop tables that were created for this test + session._run_query(f'drop table if exists "{tmp_table_basic}"') @pytest.mark.parametrize("table_type", ["", "temp", "temporary", "transient"]) @@ -179,20 +193,21 @@ def test_write_pandas_with_table_type(session, table_type: str): table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) try: - table = session.write_pandas( - df, - table_name, - table_type=table_type, - auto_create_table=True, - ) - results = table.to_pandas() - assert_frame_equal(results, df, check_dtype=False) - Utils.assert_table_type(session, table_name, table_type) + with SqlCounter(query_count=6): + table = session.write_pandas( + df, + table_name, + table_type=table_type, + auto_create_table=True, + ) + results = table.to_pandas() + assert_frame_equal(results, df, check_dtype=False) + Utils.assert_table_type(session, table_name, table_type) finally: Utils.drop_table(session, table_name) -def test_write_to_different_schema(session, local_testing_mode): +def test_write_to_different_schema(session): pd_df = pd.DataFrame( [ (1, 4.5, "Nike"), @@ -205,24 +220,23 @@ def test_write_to_different_schema(session, local_testing_mode): test_schema_name = Utils.random_temp_schema() try: - if not local_testing_mode: - Utils.create_schema(session, test_schema_name) + Utils.create_schema(session, test_schema_name) # For owner's rights stored proc test, current schema does not change after creating a new schema if not is_in_stored_procedure(): session.use_schema(original_schema_name) assert session.get_current_schema() == original_schema_name table_name = random_name_for_temp_object(TempObjectType.TABLE) - session.write_pandas( - pd_df, - table_name, - quote_identifiers=False, - schema=test_schema_name, - auto_create_table=True, - ) - Utils.check_answer( - session.table(f"{test_schema_name}.{table_name}").sort("id"), - [Row(1, 4.5, "Nike"), Row(2, 7.5, "Adidas"), Row(3, 10.5, "Puma")], - ) + with SqlCounter(query_count=4): + session.write_pandas( + pd_df, + table_name, + quote_identifiers=False, + schema=test_schema_name, + auto_create_table=True, + ) + Utils.check_answer( + session.table(f"{test_schema_name}.{table_name}").sort("id"), + [Row(1, 4.5, "Nike"), Row(2, 7.5, "Adidas"), Row(3, 10.5, "Puma")], + ) finally: - if not local_testing_mode: - Utils.drop_schema(session, test_schema_name) + Utils.drop_schema(session, test_schema_name) From d5603ca00dcdc8c36594ef29f2397e8327a0eafa Mon Sep 17 00:00:00 2001 From: azhan Date: Mon, 13 May 2024 13:05:26 -0700 Subject: [PATCH 3/8] resolve comments --- .../snowpark/modin/plugin/PANDAS_CHANGELOG.md | 2 -- src/snowflake/snowpark/session.py | 3 ++- tests/integ/modin/test_session_write_pandas.py | 16 ++++++++++++++++ 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md b/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md index 024da40f56d..e626d3f9366 100644 --- a/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md +++ b/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md @@ -7,8 +7,6 @@ ### Improvements - Improved performance of `pd.qcut` by removing joins in generated sql query. - -### Improvements - Allow `session.write_pandas` to write Snowpark pandas DataFrame or Series to a table. ## 1.15.0a1 (2024-07-05) diff --git a/src/snowflake/snowpark/session.py b/src/snowflake/snowpark/session.py index 5862563ee03..4eb808b6933 100644 --- a/src/snowflake/snowpark/session.py +++ b/src/snowflake/snowpark/session.py @@ -2172,7 +2172,8 @@ def quote_id(id: str) -> str: if not auto_create_table and not self._table_exists(name): raise SnowparkClientException( - f"Cannot write Snowpark pandas DataFrame to table {location} because it does not exist. Use auto_create_table = True to create table before writing a Snowpark pandas DataFrame" + f"Cannot write Snowpark pandas DataFrame or Series to table {location} because it does not exist. Use " + f"auto_create_table = True to create table before writing a Snowpark pandas DataFrame or Series" ) if_exists = "replace" if overwrite else "append" df.to_snowflake( diff --git a/tests/integ/modin/test_session_write_pandas.py b/tests/integ/modin/test_session_write_pandas.py index 1af4d26a686..e2f3302cd22 100644 --- a/tests/integ/modin/test_session_write_pandas.py +++ b/tests/integ/modin/test_session_write_pandas.py @@ -240,3 +240,19 @@ def test_write_to_different_schema(session): ) finally: Utils.drop_schema(session, test_schema_name) + + +def test_write_series(session): + s = pd.Series([1, 2, 3], name="s") + try: + table_name = random_name_for_temp_object(TempObjectType.TABLE) + with SqlCounter(query_count=5): + table = session.write_pandas( + s, + table_name, + quote_identifiers=False, + auto_create_table=True, + ) + assert_frame_equal(s.to_frame(), table.to_pandas(), check_dtype=False) + finally: + Utils.drop_table(session, table_name) From 25f60649d2bfd1f755068e284409da1e7859dbad Mon Sep 17 00:00:00 2001 From: azhan Date: Mon, 13 May 2024 13:52:15 -0700 Subject: [PATCH 4/8] Fix doc and tests --- src/snowflake/snowpark/session.py | 9 +++++---- tests/integ/modin/test_session_write_pandas.py | 8 ++++---- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/snowflake/snowpark/session.py b/src/snowflake/snowpark/session.py index 4eb808b6933..638c00f41c1 100644 --- a/src/snowflake/snowpark/session.py +++ b/src/snowflake/snowpark/session.py @@ -2277,10 +2277,11 @@ def write_pandas( your pandas DataFrame cannot be written to the specified table, an exception will be raised. - If the dataframe is :class:`~snowflake.snowpark.modin.pandas.DataFrame` or :class:`~snowflake.snowpark.modin.pandas.Series`, - it will call :func:`snowflake.snowpark.modin.pandas.DataFrame.to_snowflake ` or - :func:`snowflake.snowpark.modin.pandas.Series.to_snowflake ` internally - to write a Snowpark pandas DataFrame into a Snowflake table. + If the dataframe is Snowpark pandas :class:`~snowflake.snowpark.modin.pandas.DataFrame` + or :class:`~snowflake.snowpark.modin.pandas.Series`, it will call + :func:`modin.pandas.DataFrame.to_snowflake ` + or :func:`modin.pandas.Series.to_snowflake ` + internally to write a Snowpark pandas DataFrame into a Snowflake table. """ if create_temp_table: warning( diff --git a/tests/integ/modin/test_session_write_pandas.py b/tests/integ/modin/test_session_write_pandas.py index e2f3302cd22..5e194c93ae5 100644 --- a/tests/integ/modin/test_session_write_pandas.py +++ b/tests/integ/modin/test_session_write_pandas.py @@ -111,8 +111,8 @@ def test_write_pandas_with_overwrite( with pytest.raises(SnowparkClientException) as ex_info: session.write_pandas(pd1, "tmp_table") assert ( - 'Cannot write Snowpark pandas DataFrame to table "tmp_table" because it does not exist. ' - "Use auto_create_table = True to create table before writing a Snowpark pandas DataFrame" + 'Cannot write Snowpark pandas DataFrame or Series to table "tmp_table" because it does not exist. ' + "Use auto_create_table = True to create table before writing a Snowpark pandas DataFrame or Series" in str(ex_info) ) finally: @@ -171,8 +171,8 @@ def test_write_pandas(session, tmp_table_basic): with pytest.raises(SnowparkClientException) as ex_info: session.write_pandas(df, nonexistent_table, auto_create_table=False) assert ( - f'Cannot write Snowpark pandas DataFrame to table "{nonexistent_table}" because it does not exist. ' - "Use auto_create_table = True to create table before writing a Snowpark pandas DataFrame" + f'Cannot write Snowpark pandas DataFrame or Series to table "{nonexistent_table}" because it does not exist. ' + "Use auto_create_table = True to create table before writing a Snowpark pandas DataFrame or Series " in str(ex_info) ) finally: From 44636ab5a88272b60c7bbdc1d335d1e4c2ecc087 Mon Sep 17 00:00:00 2001 From: azhan Date: Tue, 14 May 2024 13:59:29 -0700 Subject: [PATCH 5/8] resolve comments --- src/snowflake/snowpark/_internal/utils.py | 25 ++++++++- src/snowflake/snowpark/session.py | 17 ++++-- .../integ/modin/test_session_write_pandas.py | 55 +++++++++++++++++++ 3 files changed, 92 insertions(+), 5 deletions(-) diff --git a/src/snowflake/snowpark/_internal/utils.py b/src/snowflake/snowpark/_internal/utils.py index daf953888b8..597f984cbf5 100644 --- a/src/snowflake/snowpark/_internal/utils.py +++ b/src/snowflake/snowpark/_internal/utils.py @@ -9,6 +9,7 @@ import decimal import functools import hashlib +import importlib import io import logging import os @@ -41,7 +42,11 @@ import snowflake.snowpark from snowflake.connector.cursor import ResultMetadata, SnowflakeCursor from snowflake.connector.description import OPERATING_SYSTEM, PLATFORM -from snowflake.connector.options import pandas +from snowflake.connector.options import ( + MissingOptionalDependency, + ModuleLikeObject, + pandas, +) from snowflake.connector.version import VERSION as connector_version from snowflake.snowpark._internal.error_message import SnowparkClientExceptionMessages from snowflake.snowpark.row import Row @@ -950,3 +955,21 @@ def prepare_pivot_arguments( ) return df, pc, pivot_values, default_on_null + + +class MissingModin(MissingOptionalDependency): + """The class is specifically for modin optional dependency.""" + + _dep_name = "modin" + + +def import_or_missing_modin_pandas() -> tuple[ModuleLikeObject, bool]: + """This function tries importing the following packages: modin.pandas + + If available it returns modin package with a flag of whether it was imported. + """ + try: + modin = importlib.import_module("modin.pandas") + return modin, True + except ImportError: + return MissingModin(), False diff --git a/src/snowflake/snowpark/session.py b/src/snowflake/snowpark/session.py index 638c00f41c1..51443b4d742 100644 --- a/src/snowflake/snowpark/session.py +++ b/src/snowflake/snowpark/session.py @@ -90,6 +90,7 @@ get_stage_file_prefix_length, get_temp_type_for_object, get_version, + import_or_missing_modin_pandas, is_in_stored_procedure, normalize_local_file, normalize_remote_file_or_dir, @@ -2140,8 +2141,8 @@ def _write_modin_pandas_helper( database: Database that the table is in. If not provided, the default one will be used. schema: Schema that the table is in. If not provided, the default one will be used. quote_identifiers: By default, identifiers, specifically database, schema, table and column names - (from :attr:`DataFrame.columns`) will be quoted. If set to ``False``, identifiers - are passed on to Snowflake without quoting, i.e. identifiers will be coerced to uppercase by Snowflake. + (from :attr:`DataFrame.columns`) will be quoted. `to_snowflake` always quote column names so + quote_identifiers = False is not supported here. auto_create_table: When true, automatically creates a table to store the passed in pandas DataFrame using the passed in ``database``, ``schema``, and ``table_name``. Note: there are usually multiple table configurations that would allow you to upload a particular pandas DataFrame successfully. If you don't like the auto created @@ -2160,9 +2161,13 @@ def _write_modin_pandas_helper( and ``transient``. An empty string means to create a permanent table. Learn more about table types `here `_. """ + if not quote_identifiers: + raise NotImplementedError( + "quote_identifiers = False is not supported by `to_snowflake`." + ) def quote_id(id: str) -> str: - return '"' + id + '"' if quote_identifiers else id + return '"' + id + '"' name = [table_name] if schema: @@ -2323,7 +2328,11 @@ def write_pandas( "snowflake-connector-python to 3.4.0 or above.", stacklevel=1, ) - if "modin" in str(type(df)): + + modin_pandas, modin_is_imported = import_or_missing_modin_pandas() + if modin_is_imported and isinstance( + df, (modin_pandas.DataFrame, modin_pandas.Series) + ): self._write_modin_pandas_helper( df, table_name, diff --git a/tests/integ/modin/test_session_write_pandas.py b/tests/integ/modin/test_session_write_pandas.py index 5e194c93ae5..b5c029e2459 100644 --- a/tests/integ/modin/test_session_write_pandas.py +++ b/tests/integ/modin/test_session_write_pandas.py @@ -256,3 +256,58 @@ def test_write_series(session): assert_frame_equal(s.to_frame(), table.to_pandas(), check_dtype=False) finally: Utils.drop_table(session, table_name) + + +@pytest.mark.parametrize("quote_identifiers", [True, False]) +@pytest.mark.parametrize("is_modin_dataframe", [True, False]) +def test_write_pandas_with_quote_identifiers( + session, + is_modin_dataframe: bool, + quote_identifiers: bool, +): + table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE) + try: + pd1 = pd.DataFrame( + [ + (1, 4.5, "Nike"), + (2, 7.5, "Adidas"), + (3, 10.5, "Puma"), + ], + columns=["id", "foot_size", "shoe_make"], + ) + + if not is_modin_dataframe: + pd1 = pd1.to_pandas() + + if is_modin_dataframe and not quote_identifiers: + with pytest.raises(NotImplementedError): + table1 = session.write_pandas( + pd1, + table_name, + quote_identifiers=quote_identifiers, + auto_create_table=True, + ) + else: + with SqlCounter(query_count=3 if is_modin_dataframe else 0): + # Create initial table and insert 3 rows + table1 = session.write_pandas( + pd1, + table_name, + quote_identifiers=quote_identifiers, + auto_create_table=True, + ) + + def is_quoted(name: str) -> bool: + return name[0] == '"' and name[-1] == '"' + + if quote_identifiers: + assert is_quoted(table1.table_name) + for col in table1.columns: + assert is_quoted(col) + elif is_modin_dataframe: + assert not is_quoted(table1.table_name) + for col in table1.columns: + assert not is_quoted(col) + + finally: + Utils.drop_table(session, table_name) From 7620e92770ff3334091cfd916240755ef2a16428 Mon Sep 17 00:00:00 2001 From: azhan Date: Tue, 14 May 2024 14:32:17 -0700 Subject: [PATCH 6/8] fix 3.8 error --- src/snowflake/snowpark/_internal/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snowflake/snowpark/_internal/utils.py b/src/snowflake/snowpark/_internal/utils.py index 597f984cbf5..5f0b4296a43 100644 --- a/src/snowflake/snowpark/_internal/utils.py +++ b/src/snowflake/snowpark/_internal/utils.py @@ -963,7 +963,7 @@ class MissingModin(MissingOptionalDependency): _dep_name = "modin" -def import_or_missing_modin_pandas() -> tuple[ModuleLikeObject, bool]: +def import_or_missing_modin_pandas() -> Tuple[ModuleLikeObject, bool]: """This function tries importing the following packages: modin.pandas If available it returns modin package with a flag of whether it was imported. From 26fa8cd19e4842c6cbf9b0441c03e9f6c6e1c676 Mon Sep 17 00:00:00 2001 From: azhan Date: Wed, 15 May 2024 10:06:48 -0700 Subject: [PATCH 7/8] fix test --- tests/integ/modin/test_session_write_pandas.py | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/tests/integ/modin/test_session_write_pandas.py b/tests/integ/modin/test_session_write_pandas.py index b5c029e2459..465ac2e463d 100644 --- a/tests/integ/modin/test_session_write_pandas.py +++ b/tests/integ/modin/test_session_write_pandas.py @@ -24,12 +24,10 @@ from tests.utils import Utils -@pytest.mark.parametrize("quote_identifiers", [True, False]) @pytest.mark.parametrize("auto_create_table", [True, False]) @pytest.mark.parametrize("overwrite", [True, False]) def test_write_pandas_with_overwrite( session, - quote_identifiers: bool, auto_create_table: bool, overwrite: bool, ): @@ -59,7 +57,6 @@ def test_write_pandas_with_overwrite( table1 = session.write_pandas( pd1, table_name, - quote_identifiers=quote_identifiers, auto_create_table=True, ) @@ -70,7 +67,6 @@ def test_write_pandas_with_overwrite( table2 = session.write_pandas( pd2, table_name, - quote_identifiers=quote_identifiers, overwrite=overwrite, auto_create_table=auto_create_table, ) @@ -88,7 +84,6 @@ def test_write_pandas_with_overwrite( table3 = session.write_pandas( pd3, table_name, - quote_identifiers=quote_identifiers, overwrite=overwrite, auto_create_table=auto_create_table, ) @@ -101,7 +96,6 @@ def test_write_pandas_with_overwrite( session.write_pandas( pd3, table_name, - quote_identifiers=quote_identifiers, overwrite=overwrite, auto_create_table=auto_create_table, ) @@ -218,28 +212,27 @@ def test_write_to_different_schema(session): ) original_schema_name = session.get_current_schema() test_schema_name = Utils.random_temp_schema() - + quoted_test_schema_name = '"' + test_schema_name + '"' try: - Utils.create_schema(session, test_schema_name) + Utils.create_schema(session, quoted_test_schema_name) # For owner's rights stored proc test, current schema does not change after creating a new schema if not is_in_stored_procedure(): session.use_schema(original_schema_name) assert session.get_current_schema() == original_schema_name table_name = random_name_for_temp_object(TempObjectType.TABLE) with SqlCounter(query_count=4): - session.write_pandas( + table = session.write_pandas( pd_df, table_name, - quote_identifiers=False, schema=test_schema_name, auto_create_table=True, ) Utils.check_answer( - session.table(f"{test_schema_name}.{table_name}").sort("id"), + table.sort("id"), [Row(1, 4.5, "Nike"), Row(2, 7.5, "Adidas"), Row(3, 10.5, "Puma")], ) finally: - Utils.drop_schema(session, test_schema_name) + Utils.drop_schema(session, quoted_test_schema_name) def test_write_series(session): @@ -250,7 +243,6 @@ def test_write_series(session): table = session.write_pandas( s, table_name, - quote_identifiers=False, auto_create_table=True, ) assert_frame_equal(s.to_frame(), table.to_pandas(), check_dtype=False) From 9c5824392a7a4a93e71085968a4b1a2147e9ca59 Mon Sep 17 00:00:00 2001 From: azhan Date: Wed, 15 May 2024 10:44:52 -0700 Subject: [PATCH 8/8] fix sql counter --- tests/integ/modin/test_session_write_pandas.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/integ/modin/test_session_write_pandas.py b/tests/integ/modin/test_session_write_pandas.py index 465ac2e463d..c8c1e39ea80 100644 --- a/tests/integ/modin/test_session_write_pandas.py +++ b/tests/integ/modin/test_session_write_pandas.py @@ -272,13 +272,14 @@ def test_write_pandas_with_quote_identifiers( pd1 = pd1.to_pandas() if is_modin_dataframe and not quote_identifiers: - with pytest.raises(NotImplementedError): - table1 = session.write_pandas( - pd1, - table_name, - quote_identifiers=quote_identifiers, - auto_create_table=True, - ) + with SqlCounter(query_count=0): + with pytest.raises(NotImplementedError): + session.write_pandas( + pd1, + table_name, + quote_identifiers=quote_identifiers, + auto_create_table=True, + ) else: with SqlCounter(query_count=3 if is_modin_dataframe else 0): # Create initial table and insert 3 rows