Skip to content

Commit

Permalink
[SNOW-1370566]: Add support for Snowpark pandas cache_result API (#…
Browse files Browse the repository at this point in the history
sfc-gh-rdurrani authored May 23, 2024
1 parent 421723d commit ee931fe
Showing 16 changed files with 633 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@
#### New Features

- Added support for `if_not_exists` parameter during udf and sproc registration.
- Added `DataFrame.cache_result` and `Series.cache_result` methods for users to persist DataFrames' and Series' to a temporary table lasting the duration of the session to improve latency of subsequent operations.

#### Improvements

19 changes: 10 additions & 9 deletions docs/source/modin/dataframe.rst
Original file line number Diff line number Diff line change
@@ -29,6 +29,16 @@ DataFrame
DataFrame.shape
DataFrame.empty

.. rubric:: Snowflake Specific

.. autosummary::
:toctree: pandas_api/

DataFrame.to_pandas
DataFrame.to_snowflake
DataFrame.to_snowpark
DataFrame.cache_result

.. rubric:: Conversion

.. autosummary::
@@ -197,12 +207,3 @@ DataFrame
DataFrame.first_valid_index
DataFrame.last_valid_index
DataFrame.resample

.. rubric:: Serialization / IO / conversion

.. autosummary::
:toctree: pandas_api/

DataFrame.to_pandas
DataFrame.to_snowflake
DataFrame.to_snowpark
3 changes: 2 additions & 1 deletion docs/source/modin/index.rst
Original file line number Diff line number Diff line change
@@ -18,4 +18,5 @@ For your convenience, here is all the :doc:`Supported APIs <supported/index>`
groupby
resampling
numpy
All supported APIs <supported/index>
All supported APIs <supported/index>
performance
58 changes: 58 additions & 0 deletions docs/source/modin/performance.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
Performance Recommendations
===========================

This page contains recommendations to help improve performance when using the Snowpark pandas API.

Caching Intermediate Results
----------------------------
Snowpark pandas uses a lazy paradigm - when operations are called on a Snowpark pandas object,
a lazy operator graph is built up and executed only when an output operation is called (e.g. printing
the data, or persisting it to a table in Snowflake). This paradigm mirrors the Snowpark DataFrame paradigm,
and enables larger queries to be optimized using Snowflake's SQL Query Optimizer. Certain workloads, however,
can generate large operator graphs that include repeated, computationally expensive, subgraphs.
Take the following code snippet as an example:

.. code-block:: python
import modin.pandas as pd
import numpy as np
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
# Session.builder.create() will create a default Snowflake connection.
Session.builder.create()
df = pd.concat([pd.DataFrame([range(i, i+5)]) for i in range(0, 150, 5)])
print(df)
df = df.reset_index(drop=True)
print(df)
The above code snippet creates a 30x5 DataFrame using concatenation of 30 smaller 1x5 DataFrames,
prints it, resets its index, and prints it again. The concatenation step can be expensive, and is
lazily recomputed every time the dataframe is materialized - once per print. Instead, we recommend using
Snowpark pandas' ``cache_result`` API in order to materialize expensive computations that are reused
in order to decrease the latency of longer pipelines.

.. code-block:: python
import modin.pandas as pd
import numpy as np
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session
# Session.builder.create() will create a default Snowflake connection.
Session.builder.create()
df = pd.concat([pd.DataFrame([range(i, i+5)]) for i in range(0, 150, 5)])
df = df.cache_result(inplace=False)
print(df)
df = df.reset_index(drop=True)
print(df)
Consider using the ``cache_result`` API whenever a DataFrame or Series that is expensive to compute sees high reuse.

Known Limitations
^^^^^^^^^^^^^^^^^
Using the ``cache_result`` API after an ``apply``, an ``applymap`` or a ``groupby.apply`` is unlikely to yield performance savings.
``apply(func, axis=1)`` when ``func`` has no return type annotation and ``groupby.apply`` are implemented internally via UDTFs, and feature
intermediate result caching as part of their implementation. ``apply(func, axis=1)`` when func has a return type annotation, and ``applymap``
internally use UDFs - any overhead observed when using these APIs is likely due to the set-up and definition of the UDF, and is unlikely to be
alleviated via the ``cache_result`` API.
10 changes: 8 additions & 2 deletions docs/source/modin/series.rst
Original file line number Diff line number Diff line change
@@ -33,6 +33,14 @@ Series
Series.values


.. rubric:: Snowflake Specific

.. autosummary::
:toctree: pandas_api/

Series.to_snowflake
Series.to_snowpark
Series.cache_result

.. rubric:: Conversion

@@ -46,8 +54,6 @@ Series
Series.to_list
Series.to_numpy
Series.to_pandas
Series.to_snowflake
Series.to_snowpark
Series.__array__


17 changes: 17 additions & 0 deletions src/snowflake/snowpark/modin/plugin/_internal/frame.py
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@
ROW_POSITION_COLUMN_LABEL,
append_columns,
assert_duplicate_free,
cache_result,
count_rows,
extract_pandas_label_from_snowflake_quoted_identifier,
fill_missing_levels_for_pandas_label,
@@ -691,6 +692,22 @@ def ensure_row_count_column(self) -> "InternalFrame":
index_column_snowflake_quoted_identifiers=self.index_column_snowflake_quoted_identifiers,
)

def persist_to_temporary_table(self) -> "InternalFrame":
"""
Persists the OrderedDataFrame backing this InternalFrame to a temporary table for the duration of the session.
Returns:
A new InternalFrame with the backing OrderedDataFrame persisted to a temporary table.
"""
return InternalFrame.create(
ordered_dataframe=cache_result(self.ordered_dataframe),
data_column_pandas_labels=self.data_column_pandas_labels,
data_column_snowflake_quoted_identifiers=self.data_column_snowflake_quoted_identifiers,
data_column_pandas_index_names=self.data_column_pandas_index_names,
index_column_pandas_labels=self.index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=self.index_column_snowflake_quoted_identifiers,
)

def append_column(
self, pandas_label: Hashable, value: SnowparkColumn
) -> "InternalFrame":
Original file line number Diff line number Diff line change
@@ -1134,6 +1134,12 @@ def to_snowpark(
index, index_label
)

def cache_result(self) -> "SnowflakeQueryCompiler":
"""
Returns a materialized view of this QueryCompiler.
"""
return SnowflakeQueryCompiler(self._modin_frame.persist_to_temporary_table())

@property
def columns(self) -> native_pd.Index:
"""
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
from snowflake.snowpark.modin.plugin._internal.telemetry import (
snowpark_pandas_telemetry_method_decorator,
)
from snowflake.snowpark.modin.plugin.extensions.utils import add_cache_result_docstring


# Snowflake specific dataframe methods
@@ -239,3 +240,17 @@ def to_pandas(
Name: Animal, dtype: object
"""
return self._to_pandas(statement_params=statement_params, **kwargs)


@register_dataframe_accessor("cache_result")
@add_cache_result_docstring
@snowpark_pandas_telemetry_method_decorator
def cache_result(self, inplace: bool = False) -> Optional[pd.DataFrame]:
"""
Persists the current Snowpark pandas DataFrame to a temporary table that lasts the duration of the session.
"""
new_qc = self._query_compiler.cache_result()
if inplace:
self._update_inplace(new_qc)
else:
return pd.DataFrame(query_compiler=new_qc)
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
from snowflake.snowpark.modin.plugin._internal.telemetry import (
snowpark_pandas_telemetry_method_decorator,
)
from snowflake.snowpark.modin.plugin.extensions.utils import add_cache_result_docstring


@register_series_accessor("to_snowflake")
@@ -203,3 +204,17 @@ def to_pandas(
Name: Animal, dtype: object
"""
return self._to_pandas(statement_params=statement_params, **kwargs)


@register_series_accessor("cache_result")
@add_cache_result_docstring
@snowpark_pandas_telemetry_method_decorator
def cache_result(self, inplace: bool = False) -> Optional[pd.Series]:
"""
Persists the Snowpark pandas Series to a temporary table for the duration of the session.
"""
new_qc = self._query_compiler.cache_result()
if inplace:
self._update_inplace(new_qc)
else:
return pd.Series(query_compiler=new_qc)
74 changes: 74 additions & 0 deletions src/snowflake/snowpark/modin/plugin/extensions/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#

"""
File containing utilities for the extensions API.
"""
from snowflake.snowpark.modin.utils import Fn

cache_result_docstring = """
Persists the current Snowpark pandas {object_name} to a temporary table to improve the latency of subsequent operations.
Args:
inplace: bool, default False
Whether to perform the materialization inplace.
Returns:
Snowpark pandas {object_name} or None
Cached Snowpark pandas {object_name} or None if ``inplace=True``.
Note:
- The temporary table produced by this method lasts for the duration of the session.
Examples:
{examples}
"""

cache_result_examples = """
Let's make a {object_name} using a computationally expensive operation, e.g.:
>>> {object_var_name} = {object_creation_call}
Due to Snowpark pandas lazy evaluation paradigm, every time this {object_name} is used, it will be recomputed -
causing every subsequent operation on this {object_name} to re-perform the 30 unions required to produce it.
This makes subsequent operations more expensive. The `cache_result` API can be used to persist the
{object_name} to a temporary table for the duration of the session - replacing the nested 30 unions with a single
read from a table.
>>> new_{object_var_name} = {object_var_name}.cache_result()
>>> import numpy as np
>>> np.all((new_{object_var_name} == {object_var_name}).values)
True
>>> {object_var_name}.reset_index(drop=True, inplace=True) # Slower
>>> new_{object_var_name}.reset_index(drop=True, inplace=True) # Faster
"""


def add_cache_result_docstring(func: Fn) -> Fn:
"""
Decorator to add docstring to cache_result method.
"""
# In this case, we are adding the docstring to Series.cache_result.
if "series" in func.__module__:
object_name = "Series"
examples_portion = cache_result_examples.format(
object_name=object_name,
object_var_name="series",
object_creation_call="pd.concat([pd.Series([i]) for i in range(30)])",
)
else:
object_name = "DataFrame"
examples_portion = cache_result_examples.format(
object_name=object_name,
object_var_name="df",
object_creation_call="pd.concat([pd.DataFrame([range(i, i+5)]) for i in range(0, 150, 5)])",
)
func.__doc__ = cache_result_docstring.format(
object_name=object_name, examples=examples_portion
)
return func
11 changes: 11 additions & 0 deletions tests/integ/modin/frame/conftest.py
Original file line number Diff line number Diff line change
@@ -3,6 +3,8 @@
#


from string import ascii_lowercase

import modin.pandas as pd
import numpy as np
import pandas as native_pd
@@ -148,6 +150,15 @@ def time_index_native_df():
)


@pytest.fixture(scope="function")
def date_index_string_column_data():
kwargs = {
"index": date_columns_no_tz,
"columns": list(ascii_lowercase[: df_data.shape[1]]),
}
return df_data, kwargs


@pytest.fixture(scope="function")
def float_native_df() -> native_pd.DataFrame:
"""
Loading

0 comments on commit ee931fe

Please sign in to comment.