diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b3a37ca2db..a1f1cbfa659 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ #### New Features - Added support for `if_not_exists` parameter during udf and sproc registration. +- Added `DataFrame.cache_result` and `Series.cache_result` methods for users to persist DataFrames' and Series' to a temporary table lasting the duration of the session to improve latency of subsequent operations. #### Improvements diff --git a/docs/source/modin/dataframe.rst b/docs/source/modin/dataframe.rst index 098f8c814f0..25f4757f6b9 100644 --- a/docs/source/modin/dataframe.rst +++ b/docs/source/modin/dataframe.rst @@ -29,6 +29,16 @@ DataFrame DataFrame.shape DataFrame.empty +.. rubric:: Snowflake Specific + +.. autosummary:: + :toctree: pandas_api/ + + DataFrame.to_pandas + DataFrame.to_snowflake + DataFrame.to_snowpark + DataFrame.cache_result + .. rubric:: Conversion .. autosummary:: @@ -197,12 +207,3 @@ DataFrame DataFrame.first_valid_index DataFrame.last_valid_index DataFrame.resample - -.. rubric:: Serialization / IO / conversion - -.. autosummary:: - :toctree: pandas_api/ - - DataFrame.to_pandas - DataFrame.to_snowflake - DataFrame.to_snowpark diff --git a/docs/source/modin/index.rst b/docs/source/modin/index.rst index 1e17d3e1ace..64dc9d0aaf6 100644 --- a/docs/source/modin/index.rst +++ b/docs/source/modin/index.rst @@ -18,4 +18,5 @@ For your convenience, here is all the :doc:`Supported APIs ` groupby resampling numpy - All supported APIs \ No newline at end of file + All supported APIs + performance \ No newline at end of file diff --git a/docs/source/modin/performance.rst b/docs/source/modin/performance.rst new file mode 100644 index 00000000000..25e20a6e830 --- /dev/null +++ b/docs/source/modin/performance.rst @@ -0,0 +1,58 @@ +Performance Recommendations +=========================== + +This page contains recommendations to help improve performance when using the Snowpark pandas API. + +Caching Intermediate Results +---------------------------- +Snowpark pandas uses a lazy paradigm - when operations are called on a Snowpark pandas object, +a lazy operator graph is built up and executed only when an output operation is called (e.g. printing +the data, or persisting it to a table in Snowflake). This paradigm mirrors the Snowpark DataFrame paradigm, +and enables larger queries to be optimized using Snowflake's SQL Query Optimizer. Certain workloads, however, +can generate large operator graphs that include repeated, computationally expensive, subgraphs. +Take the following code snippet as an example: + +.. code-block:: python + + import modin.pandas as pd + import numpy as np + import snowflake.snowpark.modin.plugin + from snowflake.snowpark import Session + + # Session.builder.create() will create a default Snowflake connection. + Session.builder.create() + df = pd.concat([pd.DataFrame([range(i, i+5)]) for i in range(0, 150, 5)]) + print(df) + df = df.reset_index(drop=True) + print(df) + +The above code snippet creates a 30x5 DataFrame using concatenation of 30 smaller 1x5 DataFrames, +prints it, resets its index, and prints it again. The concatenation step can be expensive, and is +lazily recomputed every time the dataframe is materialized - once per print. Instead, we recommend using +Snowpark pandas' ``cache_result`` API in order to materialize expensive computations that are reused +in order to decrease the latency of longer pipelines. + +.. code-block:: python + + import modin.pandas as pd + import numpy as np + import snowflake.snowpark.modin.plugin + from snowflake.snowpark import Session + + # Session.builder.create() will create a default Snowflake connection. + Session.builder.create() + df = pd.concat([pd.DataFrame([range(i, i+5)]) for i in range(0, 150, 5)]) + df = df.cache_result(inplace=False) + print(df) + df = df.reset_index(drop=True) + print(df) + +Consider using the ``cache_result`` API whenever a DataFrame or Series that is expensive to compute sees high reuse. + +Known Limitations +^^^^^^^^^^^^^^^^^ +Using the ``cache_result`` API after an ``apply``, an ``applymap`` or a ``groupby.apply`` is unlikely to yield performance savings. +``apply(func, axis=1)`` when ``func`` has no return type annotation and ``groupby.apply`` are implemented internally via UDTFs, and feature +intermediate result caching as part of their implementation. ``apply(func, axis=1)`` when func has a return type annotation, and ``applymap`` +internally use UDFs - any overhead observed when using these APIs is likely due to the set-up and definition of the UDF, and is unlikely to be +alleviated via the ``cache_result`` API. \ No newline at end of file diff --git a/docs/source/modin/series.rst b/docs/source/modin/series.rst index d3a87183d9b..d5d9d00cef8 100644 --- a/docs/source/modin/series.rst +++ b/docs/source/modin/series.rst @@ -33,6 +33,14 @@ Series Series.values +.. rubric:: Snowflake Specific + +.. autosummary:: + :toctree: pandas_api/ + + Series.to_snowflake + Series.to_snowpark + Series.cache_result .. rubric:: Conversion @@ -46,8 +54,6 @@ Series Series.to_list Series.to_numpy Series.to_pandas - Series.to_snowflake - Series.to_snowpark Series.__array__ diff --git a/src/snowflake/snowpark/modin/plugin/_internal/frame.py b/src/snowflake/snowpark/modin/plugin/_internal/frame.py index 598839e01ce..9cf3fd9d4c1 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/frame.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/frame.py @@ -27,6 +27,7 @@ ROW_POSITION_COLUMN_LABEL, append_columns, assert_duplicate_free, + cache_result, count_rows, extract_pandas_label_from_snowflake_quoted_identifier, fill_missing_levels_for_pandas_label, @@ -691,6 +692,22 @@ def ensure_row_count_column(self) -> "InternalFrame": index_column_snowflake_quoted_identifiers=self.index_column_snowflake_quoted_identifiers, ) + def persist_to_temporary_table(self) -> "InternalFrame": + """ + Persists the OrderedDataFrame backing this InternalFrame to a temporary table for the duration of the session. + + Returns: + A new InternalFrame with the backing OrderedDataFrame persisted to a temporary table. + """ + return InternalFrame.create( + ordered_dataframe=cache_result(self.ordered_dataframe), + data_column_pandas_labels=self.data_column_pandas_labels, + data_column_snowflake_quoted_identifiers=self.data_column_snowflake_quoted_identifiers, + data_column_pandas_index_names=self.data_column_pandas_index_names, + index_column_pandas_labels=self.index_column_pandas_labels, + index_column_snowflake_quoted_identifiers=self.index_column_snowflake_quoted_identifiers, + ) + def append_column( self, pandas_label: Hashable, value: SnowparkColumn ) -> "InternalFrame": diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index 4572fa87f05..96d3885de14 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -1134,6 +1134,12 @@ def to_snowpark( index, index_label ) + def cache_result(self) -> "SnowflakeQueryCompiler": + """ + Returns a materialized view of this QueryCompiler. + """ + return SnowflakeQueryCompiler(self._modin_frame.persist_to_temporary_table()) + @property def columns(self) -> native_pd.Index: """ diff --git a/src/snowflake/snowpark/modin/plugin/extensions/dataframe_extensions.py b/src/snowflake/snowpark/modin/plugin/extensions/dataframe_extensions.py index 9feaad7f57a..a2d4710bf66 100644 --- a/src/snowflake/snowpark/modin/plugin/extensions/dataframe_extensions.py +++ b/src/snowflake/snowpark/modin/plugin/extensions/dataframe_extensions.py @@ -19,6 +19,7 @@ from snowflake.snowpark.modin.plugin._internal.telemetry import ( snowpark_pandas_telemetry_method_decorator, ) +from snowflake.snowpark.modin.plugin.extensions.utils import add_cache_result_docstring # Snowflake specific dataframe methods @@ -239,3 +240,17 @@ def to_pandas( Name: Animal, dtype: object """ return self._to_pandas(statement_params=statement_params, **kwargs) + + +@register_dataframe_accessor("cache_result") +@add_cache_result_docstring +@snowpark_pandas_telemetry_method_decorator +def cache_result(self, inplace: bool = False) -> Optional[pd.DataFrame]: + """ + Persists the current Snowpark pandas DataFrame to a temporary table that lasts the duration of the session. + """ + new_qc = self._query_compiler.cache_result() + if inplace: + self._update_inplace(new_qc) + else: + return pd.DataFrame(query_compiler=new_qc) diff --git a/src/snowflake/snowpark/modin/plugin/extensions/series_extensions.py b/src/snowflake/snowpark/modin/plugin/extensions/series_extensions.py index f2a8c2df196..f5e27a44e80 100644 --- a/src/snowflake/snowpark/modin/plugin/extensions/series_extensions.py +++ b/src/snowflake/snowpark/modin/plugin/extensions/series_extensions.py @@ -19,6 +19,7 @@ from snowflake.snowpark.modin.plugin._internal.telemetry import ( snowpark_pandas_telemetry_method_decorator, ) +from snowflake.snowpark.modin.plugin.extensions.utils import add_cache_result_docstring @register_series_accessor("to_snowflake") @@ -203,3 +204,17 @@ def to_pandas( Name: Animal, dtype: object """ return self._to_pandas(statement_params=statement_params, **kwargs) + + +@register_series_accessor("cache_result") +@add_cache_result_docstring +@snowpark_pandas_telemetry_method_decorator +def cache_result(self, inplace: bool = False) -> Optional[pd.Series]: + """ + Persists the Snowpark pandas Series to a temporary table for the duration of the session. + """ + new_qc = self._query_compiler.cache_result() + if inplace: + self._update_inplace(new_qc) + else: + return pd.Series(query_compiler=new_qc) diff --git a/src/snowflake/snowpark/modin/plugin/extensions/utils.py b/src/snowflake/snowpark/modin/plugin/extensions/utils.py new file mode 100644 index 00000000000..0139fe30587 --- /dev/null +++ b/src/snowflake/snowpark/modin/plugin/extensions/utils.py @@ -0,0 +1,74 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# + +""" +File containing utilities for the extensions API. +""" +from snowflake.snowpark.modin.utils import Fn + +cache_result_docstring = """ +Persists the current Snowpark pandas {object_name} to a temporary table to improve the latency of subsequent operations. + +Args: + inplace: bool, default False + Whether to perform the materialization inplace. + +Returns: + Snowpark pandas {object_name} or None + Cached Snowpark pandas {object_name} or None if ``inplace=True``. + +Note: + - The temporary table produced by this method lasts for the duration of the session. + +Examples: +{examples} +""" + +cache_result_examples = """ +Let's make a {object_name} using a computationally expensive operation, e.g.: +>>> {object_var_name} = {object_creation_call} + +Due to Snowpark pandas lazy evaluation paradigm, every time this {object_name} is used, it will be recomputed - +causing every subsequent operation on this {object_name} to re-perform the 30 unions required to produce it. +This makes subsequent operations more expensive. The `cache_result` API can be used to persist the +{object_name} to a temporary table for the duration of the session - replacing the nested 30 unions with a single +read from a table. + +>>> new_{object_var_name} = {object_var_name}.cache_result() + +>>> import numpy as np + +>>> np.all((new_{object_var_name} == {object_var_name}).values) +True + +>>> {object_var_name}.reset_index(drop=True, inplace=True) # Slower + +>>> new_{object_var_name}.reset_index(drop=True, inplace=True) # Faster + +""" + + +def add_cache_result_docstring(func: Fn) -> Fn: + """ + Decorator to add docstring to cache_result method. + """ + # In this case, we are adding the docstring to Series.cache_result. + if "series" in func.__module__: + object_name = "Series" + examples_portion = cache_result_examples.format( + object_name=object_name, + object_var_name="series", + object_creation_call="pd.concat([pd.Series([i]) for i in range(30)])", + ) + else: + object_name = "DataFrame" + examples_portion = cache_result_examples.format( + object_name=object_name, + object_var_name="df", + object_creation_call="pd.concat([pd.DataFrame([range(i, i+5)]) for i in range(0, 150, 5)])", + ) + func.__doc__ = cache_result_docstring.format( + object_name=object_name, examples=examples_portion + ) + return func diff --git a/tests/integ/modin/frame/conftest.py b/tests/integ/modin/frame/conftest.py index 2c10177bbb9..c9b9a3b5825 100644 --- a/tests/integ/modin/frame/conftest.py +++ b/tests/integ/modin/frame/conftest.py @@ -3,6 +3,8 @@ # +from string import ascii_lowercase + import modin.pandas as pd import numpy as np import pandas as native_pd @@ -148,6 +150,15 @@ def time_index_native_df(): ) +@pytest.fixture(scope="function") +def date_index_string_column_data(): + kwargs = { + "index": date_columns_no_tz, + "columns": list(ascii_lowercase[: df_data.shape[1]]), + } + return df_data, kwargs + + @pytest.fixture(scope="function") def float_native_df() -> native_pd.DataFrame: """ diff --git a/tests/integ/modin/frame/test_cache_result.py b/tests/integ/modin/frame/test_cache_result.py new file mode 100644 index 00000000000..c78cefaa3a0 --- /dev/null +++ b/tests/integ/modin/frame/test_cache_result.py @@ -0,0 +1,242 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# + + +import modin.pandas as pd +import numpy as np +import pandas as native_pd +import pytest +from pandas.testing import assert_frame_equal + +import snowflake.snowpark.modin.plugin # noqa: F401 +from tests.integ.modin.sql_counter import SqlCounter +from tests.integ.modin.utils import ( + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck, + create_test_dfs, +) + + +def assert_empty_snowpark_pandas_equals_to_pandas(snow_df, native_df): + native_snow_df = snow_df.to_pandas() + assert native_df.empty and native_snow_df.empty + # When columns or index are empty, we have an empty Index, but pandas has a RangeIndex with no elements. + if len(native_snow_df.columns) == 0: + assert len(native_df.columns) == 0 + else: + assert native_snow_df.columns.equals(native_df.columns) + if len(native_snow_df.index) == 0: + assert len(native_df.index) == 0 + else: + assert native_snow_df.index.equals(native_df.index) + + +@pytest.fixture(scope="function") +def simple_test_data(): + return { + "col0": ["foo", "bar", "baz", "foo", "bar", "baz", "foo", "bar", "baz"], + "col1": ["abc", "def", "ghi", "ghi", "abc", "def", "def", "ghi", "abc"], + "col2": list(range(9)), + } + + +def cache_and_return_df(snow_df, inplace): + """ + Helper method to cache and return a reference to cached DataFrame depending on inplace. + + Notes + ----- + If inplace=True, the returned df is a reference to the inputted df. + """ + if not inplace: + cached_snow_df = snow_df.cache_result() + else: + # If inplace=True, there is no return, so we set `cached_snow_df` equal to `snow_df`. + cached_snow_df = snow_df + cached_snow_df.cache_result(inplace=inplace) + return cached_snow_df + + +def perform_chained_operations(df, module): + """ + Helper method to simulate an expensive pipeline. + + This method is expensive because the concat generates unions, and the objects being unioned + may themselves contain unions, joins, or other expensive operations like pivots. + """ + df = df.reset_index(drop=True) + return module.concat([df] * 10) + + +@pytest.mark.parametrize( + "init_kwargs", + [ + {}, + {"columns": ["A", "B", "C"]}, + {"index": ["A", "B", "C"]}, + {"columns": ["A", "B", "C"], "index": [0, 1, 2]}, + ], + ids=["no_col_no_index", "only_col", "only_index", "col_and_index"], +) +@pytest.mark.parametrize("inplace", [True, False]) +def test_cache_result_empty_dataframe(init_kwargs, inplace): + snow_df, native_df = create_test_dfs(**init_kwargs) + snow_df_copy = snow_df.copy(deep=True) + with SqlCounter(query_count=1): + cached_snow_df = cache_and_return_df(snow_df, inplace) + with SqlCounter(query_count=2): + assert_frame_equal( + snow_df_copy.to_pandas(), cached_snow_df.to_pandas(), check_index_type=False + ) + with SqlCounter(query_count=1): + if native_df.empty: + assert_empty_snowpark_pandas_equals_to_pandas(cached_snow_df, native_df) + else: + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + cached_snow_df, native_df + ) + + +@pytest.mark.parametrize("inplace", [True, False]) +def test_cache_result_dataframe_complex_correctness( + date_index_string_column_data, + inplace, +): + df_data, kwargs = date_index_string_column_data + snow_df, native_df = create_test_dfs(df_data, **kwargs) + + snow_df = snow_df.resample("2H").mean() + snow_df_copy = snow_df.copy(deep=True) + with SqlCounter(query_count=1): + cached_snow_df = cache_and_return_df(snow_df, inplace) + with SqlCounter(query_count=2): + assert_frame_equal( + snow_df_copy.to_pandas(), cached_snow_df.to_pandas(), check_index_type=False + ) + native_df = native_df.resample("2H").mean() + + cached_snow_df = cached_snow_df.set_index("b", drop=False) + native_df = native_df.set_index("b", drop=False) + with SqlCounter(query_count=1): + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + cached_snow_df, native_df, check_freq=False + ) + + +@pytest.mark.parametrize("inplace", [True, False]) +class TestCacheResultReducesQueryCount: + def test_cache_result_simple(self, inplace): + snow_df = pd.concat([pd.DataFrame([range(i, i + 5)]) for i in range(0, 15, 5)]) + native_df = perform_chained_operations( + native_pd.DataFrame(np.arange(15).reshape((3, 5))), native_pd + ) + with SqlCounter(query_count=1, union_count=29): + snow_df = perform_chained_operations(snow_df, pd) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + snow_df, native_df + ) + + with SqlCounter(query_count=1, union_count=2): + snow_df = pd.concat( + [pd.DataFrame([range(i, i + 5)]) for i in range(0, 15, 5)] + ) + cached_snow_df = cache_and_return_df(snow_df, inplace) + + with SqlCounter(query_count=1, union_count=9): + cached_snow_df = perform_chained_operations(cached_snow_df, pd) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + cached_snow_df, native_df + ) + + def test_cache_result_post_pivot(self, inplace, simple_test_data): + pivot_kwargs = { + "index": "col1", + "columns": ["col0", "col1"], + "values": "col2", + "aggfunc": ["mean", "max"], + } + snow_df = pd.DataFrame(simple_test_data).pivot_table(**pivot_kwargs) + native_df = native_pd.DataFrame(simple_test_data) + native_df = perform_chained_operations( + native_df.pivot_table(**pivot_kwargs), native_pd + ) + with SqlCounter(query_count=1, join_count=10, union_count=9): + snow_df = perform_chained_operations(snow_df, pd) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + snow_df, native_df + ) + + with SqlCounter(query_count=1, join_count=1): + snow_df = pd.DataFrame(simple_test_data).pivot_table(**pivot_kwargs) + cached_snow_df = cache_and_return_df(snow_df, inplace) + + with SqlCounter(query_count=1, union_count=9): + cached_snow_df = perform_chained_operations(cached_snow_df, pd) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + cached_snow_df, native_df + ) + + def test_cache_result_post_apply(self, inplace, simple_test_data): + # In this test, the caching doesn't aid in the query counts since + # the implementation of apply(axis=1) itself contains intermediate + # result caching. + native_df = perform_chained_operations( + native_pd.DataFrame(simple_test_data).apply(lambda x: x + x, axis=1), + native_pd, + ) + with SqlCounter(query_count=6, union_count=9, udtf_count=1): + snow_df = pd.DataFrame(simple_test_data).apply(lambda x: x + x, axis=1) + repr(snow_df) + snow_df = perform_chained_operations(snow_df, pd) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + snow_df, native_df + ) + + with SqlCounter(query_count=5, udtf_count=1): + snow_df = pd.DataFrame(simple_test_data).apply(lambda x: x + x, axis=1) + cached_snow_df = cache_and_return_df(snow_df, inplace) + + with SqlCounter(query_count=2, union_count=9): + repr(cached_snow_df) + cached_snow_df = perform_chained_operations(cached_snow_df, pd) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + cached_snow_df, + native_df, + ) + + def test_cache_result_post_applymap(self, inplace, simple_test_data): + # The high query counts in this test case come from the setup and definition + # of the UDFs used. + native_df = perform_chained_operations( + native_pd.DataFrame(simple_test_data).applymap(lambda x: x + x), native_pd + ) + with SqlCounter( + query_count=11, + union_count=9, + udf_count=2, + high_count_expected=True, + high_count_reason="applymap requires additional queries to setup the UDF.", + ): + snow_df = pd.DataFrame(simple_test_data).applymap(lambda x: x + x) + repr(snow_df) + snow_df = perform_chained_operations(snow_df, pd) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + snow_df, native_df + ) + + with SqlCounter( + query_count=10, + high_count_expected=True, + high_count_reason="applymap requires additional queries to setup the UDF.", + ): + snow_df = pd.DataFrame(simple_test_data).applymap(lambda x: x + x) + cached_snow_df = cache_and_return_df(snow_df, inplace) + + with SqlCounter(query_count=1): + repr(cached_snow_df) + with SqlCounter(query_count=1, union_count=9, udf_count=0): + cached_snow_df = perform_chained_operations(cached_snow_df, pd) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + cached_snow_df, + native_df, + ) diff --git a/tests/integ/modin/series/conftest.py b/tests/integ/modin/series/conftest.py index cb4885a9fd1..44002bff174 100644 --- a/tests/integ/modin/series/conftest.py +++ b/tests/integ/modin/series/conftest.py @@ -111,10 +111,5 @@ def native_series_with_duplicate_boolean_index(): @pytest.fixture(scope="function") -def time_index_snowpark_pandas_series(): - return pd.Series(list(range(7)), index=date_columns_no_tz) - - -@pytest.fixture(scope="function") -def time_index_native_series(): - return native_pd.Series(list(range(7)), index=date_columns_no_tz) +def time_index_series_data(): + return list(range(7)), {"index": date_columns_no_tz} diff --git a/tests/integ/modin/series/test_add_prefix.py b/tests/integ/modin/series/test_add_prefix.py index 31465f28ccb..f9e5734647c 100644 --- a/tests/integ/modin/series/test_add_prefix.py +++ b/tests/integ/modin/series/test_add_prefix.py @@ -9,7 +9,7 @@ import snowflake.snowpark.modin.plugin # noqa: F401 from tests.integ.modin.sql_counter import sql_count_checker -from tests.integ.modin.utils import eval_snowpark_pandas_result +from tests.integ.modin.utils import create_test_series, eval_snowpark_pandas_result TEST_ADD_PREFIX_DATA = [ "prefix_", @@ -48,14 +48,14 @@ def test_series_add_prefix_multiindex(prefix, multiindex_native_int_series): @sql_count_checker(query_count=1) @pytest.mark.parametrize("prefix", TEST_ADD_PREFIX_DATA) -def test_series_add_prefix_time_column_df( - prefix, time_index_snowpark_pandas_series, time_index_native_series -): +def test_series_add_prefix_time_column_df(prefix, time_index_series_data): + series_data, kwargs = time_index_series_data + snow_series, native_series = create_test_series(series_data, **kwargs) # Native pandas time values are of the format `2023-01-01 00:00:00` while Snowflake is `2023-01-01 00:00:00.000`. # For easier comparison, add_suffix is called with suffix ".000" for the native pandas df. eval_snowpark_pandas_result( - time_index_snowpark_pandas_series, - time_index_native_series.add_suffix(".000"), + snow_series, + native_series.add_suffix(".000"), lambda ser: ser.add_prefix(prefix), ) diff --git a/tests/integ/modin/series/test_add_suffix.py b/tests/integ/modin/series/test_add_suffix.py index 805c5bc897e..16c5825caf9 100644 --- a/tests/integ/modin/series/test_add_suffix.py +++ b/tests/integ/modin/series/test_add_suffix.py @@ -9,7 +9,7 @@ import snowflake.snowpark.modin.plugin # noqa: F401 from tests.integ.modin.sql_counter import sql_count_checker -from tests.integ.modin.utils import eval_snowpark_pandas_result +from tests.integ.modin.utils import create_test_series, eval_snowpark_pandas_result TEST_ADD_SUFFIX_DATA = [ "_suffix", @@ -48,14 +48,15 @@ def test_add_suffix_multiindex(suffix, multiindex_native_int_series): @sql_count_checker(query_count=1) @pytest.mark.parametrize("suffix", TEST_ADD_SUFFIX_DATA) -def test_add_suffix_time_column_df( - suffix, time_index_snowpark_pandas_series, time_index_native_series -): +def test_add_suffix_time_column_df(suffix, time_index_series_data): + series_data, kwargs = time_index_series_data + snow_series, native_series = create_test_series(series_data, **kwargs) + # Native pandas time values are of the format `2023-01-01 00:00:00` while Snowflake is `2023-01-01 00:00:00.000`. # For easier comparison, add_suffix is called with suffix ".000" for the native pandas df. eval_snowpark_pandas_result( - time_index_snowpark_pandas_series, - time_index_native_series.add_suffix(".000"), + snow_series, + native_series.add_suffix(".000"), lambda df: df.add_suffix(suffix), ) diff --git a/tests/integ/modin/series/test_cache_result.py b/tests/integ/modin/series/test_cache_result.py new file mode 100644 index 00000000000..ac8e2413fd9 --- /dev/null +++ b/tests/integ/modin/series/test_cache_result.py @@ -0,0 +1,159 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# + + +from itertools import chain + +import modin.pandas as pd +import numpy as np +import pandas as native_pd +import pytest +from pandas.testing import assert_series_equal + +import snowflake.snowpark.modin.plugin # noqa: F401 +from tests.integ.modin.sql_counter import SqlCounter +from tests.integ.modin.utils import ( + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck, + create_test_series, +) + + +def cache_and_return_series(snow_series, inplace): + """ + Helper method to cache and return a reference to cached Series depending on inplace. + + Notes + ----- + If inplace=True, the returned series is a reference to the inputted series. + """ + if not inplace: + cached_snow_series = snow_series.cache_result() + else: + # If inplace=True, there is no return, so we set `cached_snow_series` equal to `snow_series`. + cached_snow_series = snow_series + cached_snow_series.cache_result(inplace=inplace) + return cached_snow_series + + +def perform_chained_operations(series, module): + """ + Helper method to simulate an expensive pipeline. + + This method is expensive because the concat generates unions, and the objects being unioned + may themselves contain unions, joins, or other expensive operations like pivots. + """ + series = series.reset_index(drop=True) + return module.concat([series] * 10) + + +@pytest.fixture(scope="function") +def simple_test_data(): + int_data = list(range(9)) + str_data = ["foo", "bar", "baz", "foo", "bar", "baz", "foo", "bar", "baz"] + return list(chain.from_iterable(zip(int_data, str_data))) + + +@pytest.mark.parametrize( + "init_kwargs", [{}, {"index": ["A", "B", "C"]}], ids=["no_index", "index"] +) +@pytest.mark.parametrize("inplace", [True, False]) +def test_cache_result_empty_series(init_kwargs, inplace): + snow_series, native_series = create_test_series(**init_kwargs) + snow_series_copy = snow_series.copy(deep=True) + with SqlCounter(query_count=1): + cached_snow_series = cache_and_return_series(snow_series, inplace) + with SqlCounter(query_count=2): + assert_series_equal( + cached_snow_series.to_pandas(), snow_series_copy.to_pandas() + ) + with SqlCounter(query_count=1): + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + snow_series, native_series + ) + + +@pytest.mark.parametrize("inplace", [True, False]) +def test_cache_result_series_complex_correctness(time_index_series_data, inplace): + series_data, kwargs = time_index_series_data + snow_series, native_series = create_test_series(series_data, **kwargs) + snow_series = snow_series.resample("2H").mean() + snow_series_copy = snow_series.copy(deep=True) + with SqlCounter(query_count=1): + cached_snow_series = cache_and_return_series(snow_series, inplace) + with SqlCounter(query_count=2): + assert_series_equal( + cached_snow_series.to_pandas(), snow_series_copy.to_pandas() + ) + native_series = native_series.resample("2H").mean() + + with SqlCounter(query_count=1): + cached_snow_series = cached_snow_series.diff() + native_series = native_series.diff() + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + cached_snow_series, native_series, check_freq=False + ) + + +@pytest.mark.parametrize("inplace", [True, False]) +class TestCacheResultReducesQueryCount: + def test_cache_result_simple(self, inplace): + snow_series = pd.concat( + [pd.Series(list(range(i, i + 5))) for i in range(0, 50, 5)] + ) + native_series = perform_chained_operations( + native_pd.Series(np.arange(50)), native_pd + ) + # Fix for https://snowflakecomputing.atlassian.net/browse/SNOW-1442354 + # snow_series.reset_index names the returned series 0, so we must + # name the pandas Series as well so that they match. + native_series.name = 0 + with SqlCounter(query_count=1, union_count=99): + snow_series = perform_chained_operations(snow_series, pd) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + snow_series, native_series + ) + + with SqlCounter(query_count=1, union_count=9): + snow_series = pd.concat( + [pd.Series(list(range(i, i + 5))) for i in range(0, 50, 5)] + ) + cached_snow_series = cache_and_return_series(snow_series, inplace) + + with SqlCounter(query_count=1, union_count=9): + cached_snow_series = perform_chained_operations(cached_snow_series, pd) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + cached_snow_series, native_series + ) + + def test_cache_result_post_apply(self, inplace, simple_test_data): + # In this test, the caching doesn't aid in the query counts since + # the implementation of apply(axis=1) itself contains intermediate + # result caching. + native_series = perform_chained_operations( + native_pd.Series(simple_test_data).apply(lambda x: x + x), native_pd + ) + # Fix for https://snowflakecomputing.atlassian.net/browse/SNOW-1442354 + # snow_series.reset_index names the returned series 0, so we must + # name the pandas Series as well so that they match. + native_series.name = 0 + with SqlCounter(query_count=5, union_count=9): + snow_series = pd.Series(simple_test_data).apply(lambda x: x + x) + repr(snow_series) + snow_series = perform_chained_operations(snow_series, pd) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + snow_series, native_series + ) + + with SqlCounter(query_count=4): + snow_series = pd.Series(simple_test_data).apply(lambda x: x + x) + cached_snow_series = cache_and_return_series(snow_series, inplace) + + with SqlCounter(query_count=1): + repr(cached_snow_series) + with SqlCounter(query_count=1, union_count=9): + cached_snow_series = perform_chained_operations(cached_snow_series, pd) + assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( + cached_snow_series, + native_series, + )