Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1801330: Map numpy functions in apply to builtin snowflake functions #2755

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
- Added support for `Series.str.pad`.
- Added support for applying Snowpark Python function `snowflake_cortex_sentiment`.

#### Improvements
- Improve performance of `DataFrame.map`, `Series.apply` and `Series.map` methods by mapping numpy functions to snowpark functions if possible.

## 1.26.0 (2024-12-05)

Expand Down
85 changes: 31 additions & 54 deletions src/snowflake/snowpark/modin/plugin/_internal/apply_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,7 @@
pandas_lit,
is_compatible_snowpark_types,
)
from snowflake.snowpark.functions import (
builtin,
col,
dense_rank,
ln,
log,
_log2,
_log10,
sin,
snowflake_cortex_sentiment,
snowflake_cortex_summarize,
udf,
to_variant,
when,
udtf,
exp,
cos,
tan,
sinh,
cosh,
tanh,
ceil,
floor,
trunc,
sqrt,
)
from snowflake.snowpark import functions as sp_func
sfc-gh-nkumar marked this conversation as resolved.
Show resolved Hide resolved
from snowflake.snowpark.modin.plugin._internal.frame import InternalFrame
from snowflake.snowpark.modin.plugin._internal.ordered_dataframe import (
OrderedDataFrame,
Expand Down Expand Up @@ -96,23 +71,23 @@
cloudpickle.register_pickle_by_value(sys.modules[__name__])

SUPPORTED_SNOWPARK_PYTHON_FUNCTIONS_IN_APPLY = {
exp,
ln,
log,
_log2,
_log10,
sin,
cos,
tan,
sinh,
cosh,
tanh,
ceil,
floor,
trunc,
sqrt,
snowflake_cortex_sentiment,
snowflake_cortex_summarize,
sp_func.exp,
sp_func.ln,
sp_func.log,
sp_func._log2,
sp_func._log10,
sp_func.sin,
sp_func.cos,
sp_func.tan,
sp_func.sinh,
sp_func.cosh,
sp_func.tanh,
sp_func.ceil,
sp_func.floor,
sp_func.trunc,
sp_func.sqrt,
sp_func.snowflake_cortex_summarize,
sp_func.snowflake_cortex_sentiment,
}


Expand Down Expand Up @@ -287,7 +262,7 @@ def end_partition(self, df): # type: ignore[no-untyped-def] # pragma: no cover
ApplyFunc.end_partition._sf_vectorized_input = native_pd.DataFrame # type: ignore[attr-defined]

packages = list(session.get_packages().values()) + udf_packages
func_udtf = udtf(
func_udtf = sp_func.udtf(
ApplyFunc,
output_schema=PandasDataFrameType(
[LongType(), StringType(), VariantType()],
Expand Down Expand Up @@ -709,7 +684,7 @@ def end_partition(self, df: native_pd.DataFrame): # type: ignore[no-untyped-def
excluded=existing_identifiers,
wrap_double_underscore=False,
)
return udtf(
return sp_func.udtf(
ApplyFunc,
output_schema=PandasDataFrameType(
[StringType(), IntegerType(), VariantType(), IntegerType(), IntegerType()],
Expand Down Expand Up @@ -783,7 +758,7 @@ def apply_func(x): # type: ignore[no-untyped-def] # pragma: no cover
def apply_func(x): # type: ignore[no-untyped-def] # pragma: no cover
return x.apply(func, args=args, **kwargs)

func_udf = udf(
func_udf = sp_func.udf(
apply_func,
return_type=PandasSeriesType(return_type),
input_types=[PandasSeriesType(input_type)],
Expand Down Expand Up @@ -1187,12 +1162,12 @@ def groupby_apply_pivot_result_to_final_ordered_dataframe(
# in GROUP_KEY_APPEARANCE_ORDER) and assign the
# label i to all rows that came from func(group_i).
[
col(original_row_position_snowflake_quoted_identifier).as_(
sp_func.col(original_row_position_snowflake_quoted_identifier).as_(
new_index_identifier
)
if sort_method is GroupbyApplySortMethod.ORIGINAL_ROW_ORDER
else (
dense_rank().over(
sp_func.dense_rank().over(
Window.order_by(
*(
SnowparkColumn(col).asc_nulls_last()
Expand All @@ -1213,9 +1188,11 @@ def groupby_apply_pivot_result_to_final_ordered_dataframe(
),
*[
(
col(old_quoted_identifier).as_(quoted_identifier)
sp_func.col(old_quoted_identifier).as_(quoted_identifier)
if return_variant
else col(old_quoted_identifier).cast(return_type).as_(quoted_identifier)
else sp_func.col(old_quoted_identifier)
.cast(return_type)
.as_(quoted_identifier)
)
for old_quoted_identifier, quoted_identifier in zip(
data_column_snowflake_quoted_identifiers
Expand Down Expand Up @@ -1400,7 +1377,7 @@ def groupby_apply_sort_method(
# Need to wrap column name in IDENTIFIER, or else bool agg function
# will treat the name as a string literal
is_transform: bool = not ordered_dataframe_before_sort.agg(
builtin("boolor_agg")(
sp_func.builtin("boolor_agg")(
SnowparkColumn(original_row_position_quoted_identifier) == -1
).as_("is_transform")
).collect()[0][0]
Expand Down Expand Up @@ -1475,7 +1452,7 @@ def make_condition(key: Any) -> SnowparkColumn:
# Cast one of the values in the comparison to variant so that we
# we can compare types that are otherwise not comparable in
# Snowflake, like timestamp and int.
return col.equal_null(to_variant(pandas_lit(key)))
return col.equal_null(sp_func.to_variant(pandas_lit(key)))

# If any of the values we are mapping to have types that are
# incompatible with the current column's type, we have to cast the new
Expand All @@ -1498,7 +1475,7 @@ def make_condition(key: Any) -> SnowparkColumn:
def make_result(value: Any) -> SnowparkColumn:
value_expression = pandas_lit(value)
return (
to_variant(value_expression)
sp_func.to_variant(value_expression)
if should_cast_result_to_variant
else value_expression
)
Expand All @@ -1510,7 +1487,7 @@ def make_result(value: Any) -> SnowparkColumn:
make_condition(key_and_value[0]), make_result(key_and_value[1])
),
itertools.islice(map_items, 1, None),
when(make_condition(first_key), make_result(first_value)),
sp_func.when(make_condition(first_key), make_result(first_value)),
)
if isinstance(mapping, defaultdict):
case_expression = case_expression.otherwise(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@
from snowflake.snowpark.modin.plugin.utils.error_message import ErrorMessage
from snowflake.snowpark.modin.plugin.utils.warning_message import WarningMessage
from snowflake.snowpark.modin.utils import MODIN_UNNAMED_SERIES_LABEL
from snowflake.snowpark.modin.plugin.utils.numpy_to_pandas import (
NUMPY_FUNCTION_TO_SNOWFLAKE_FUNCTION,
)
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import (
ArrayType,
Expand Down Expand Up @@ -8755,6 +8758,15 @@ def applymap(
f"Snowpark pandas applymap API doesn't yet support Snowpark Python function `{func.__name__}` with args = '{args}'."
)
return self._apply_snowpark_python_function_to_columns(func, kwargs)

# Check if the function is a known numpy function that can be translated to Snowflake function.
sf_func = NUMPY_FUNCTION_TO_SNOWFLAKE_FUNCTION.get(func)
if sf_func is not None:
# TODO SNOW-1739034: remove pragma no cover when apply tests are enabled in CI
return self._apply_snowpark_python_function_to_columns(
sf_func, kwargs
) # pragma: no cover

# Currently, NULL values are always passed into the udtf even if strict=True,
# which is a bug on the server side SNOW-880105.
# The fix will not land soon, so we are going to raise not implemented error for now.
Expand Down
46 changes: 46 additions & 0 deletions src/snowflake/snowpark/modin/plugin/utils/numpy_to_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import modin.pandas as pd
from modin.pandas.base import BasePandasDataset
from modin.pandas.utils import is_scalar
import numpy as np

from snowflake.snowpark import functions as sp_func
from snowflake.snowpark.modin.plugin.utils.warning_message import WarningMessage
Expand Down Expand Up @@ -288,3 +289,48 @@ def map_to_bools(inputs: Any) -> Any:
sp_func.trunc
), # df.truncate not supported in snowpandas yet
}


NUMPY_FUNCTION_TO_SNOWFLAKE_FUNCTION = {
# Math operations
np.absolute: sp_func.abs,
np.sign: sp_func.sign,
np.negative: sp_func.negate,
np.positive: lambda col: col,
np.sqrt: sp_func.sqrt,
np.square: lambda col: sp_func.builtin("square")(col),
np.cbrt: lambda col: sp_func.builtin("cbrt")(col),
np.reciprocal: lambda col: 1 / col,
np.exp: sp_func.exp,
np.exp2: lambda col: sp_func.pow(2, col),
np.expm1: lambda col: sp_func.exp(col) - 1,
np.log: sp_func.ln,
np.log2: sp_func._log2,
np.log10: sp_func._log10,
np.log1p: lambda col: sp_func.ln(col + 1),
# Aggregate functions translate to identity functions when applied element wise
np.sum: lambda col: col,
np.min: lambda col: col,
np.max: lambda col: col,
# Trigonometric functions
np.sin: sp_func.sin,
np.cos: sp_func.cos,
np.tan: sp_func.tan,
np.sinh: sp_func.sinh,
np.cosh: sp_func.cosh,
np.tanh: sp_func.tanh,
np.arcsin: lambda col: sp_func.builtin("asin")(col),
np.arccos: lambda col: sp_func.builtin("acos")(col),
np.arctan: lambda col: sp_func.builtin("atan")(col),
np.arctan2: lambda col: sp_func.builtin("atan2")(col),
np.arcsinh: lambda col: sp_func.builtin("asinh")(col),
np.arccosh: lambda col: sp_func.builtin("acosh")(col),
np.arctanh: lambda col: sp_func.builtin("atanh")(col),
np.degrees: lambda col: sp_func.builtin("degrees")(col),
np.radians: lambda col: sp_func.builtin("radians")(col),
# Floating functions
np.ceil: sp_func.ceil,
np.floor: sp_func.floor,
np.trunc: sp_func.trunc,
np.isnan: sp_func.is_null,
}
2 changes: 1 addition & 1 deletion tests/integ/modin/frame/test_applymap.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def test_applymap_numpy(func):
native_df = native_pd.DataFrame(data)
snow_df = pd.DataFrame(data)

with SqlCounter(query_count=7, udf_count=1):
with SqlCounter(query_count=1):
sfc-gh-nkumar marked this conversation as resolved.
Show resolved Hide resolved
eval_snowpark_pandas_result(snow_df, native_df, lambda x: x.applymap(func))


Expand Down
4 changes: 2 additions & 2 deletions tests/integ/modin/series/test_apply_and_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def create_func_with_return_type_hint(func: Callable, return_type: str) -> Calla
return d["f"]


TEST_NUMPY_FUNCS = [np.min, np.sqrt, np.tan, np.sum, np.median]
TEST_NUMPY_FUNCS = [np.min, np.sqrt, np.tan, np.sum, np.square, np.log1p, np.exp2]


@pytest.mark.parametrize("method", ["apply", "map"])
Expand Down Expand Up @@ -412,7 +412,7 @@ def test_builtin_function(self, method, func):
)

@pytest.mark.parametrize("func", TEST_NUMPY_FUNCS)
@sql_count_checker(query_count=4, udf_count=1)
@sql_count_checker(query_count=1)
def test_apply_and_map_numpy(self, method, func):
data = [1.0, 2.0, 3.0]
native_series = native_pd.Series(data)
Expand Down
Loading