Skip to content

Commit

Permalink
SNOW-1801330: Map numpy functions in apply to builtin snowflake funct…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
sfc-gh-nkumar committed Dec 13, 2024
1 parent 71843f3 commit 346bcd4
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 57 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
- Added support for `Series.str.pad`.
- Added support for applying Snowpark Python function `snowflake_cortex_sentiment`.

#### Improvements
- Improve performance of `DataFrame.map`, `Series.apply` and `Series.map` methods by mapping numpy functions to snowpark functions if possible.

## 1.26.0 (2024-12-05)

Expand Down
85 changes: 31 additions & 54 deletions src/snowflake/snowpark/modin/plugin/_internal/apply_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,7 @@
pandas_lit,
is_compatible_snowpark_types,
)
from snowflake.snowpark.functions import (
builtin,
col,
dense_rank,
ln,
log,
_log2,
_log10,
sin,
snowflake_cortex_sentiment,
snowflake_cortex_summarize,
udf,
to_variant,
when,
udtf,
exp,
cos,
tan,
sinh,
cosh,
tanh,
ceil,
floor,
trunc,
sqrt,
)
from snowflake.snowpark import functions as sp_func
from snowflake.snowpark.modin.plugin._internal.frame import InternalFrame
from snowflake.snowpark.modin.plugin._internal.ordered_dataframe import (
OrderedDataFrame,
Expand Down Expand Up @@ -96,23 +71,23 @@
cloudpickle.register_pickle_by_value(sys.modules[__name__])

SUPPORTED_SNOWPARK_PYTHON_FUNCTIONS_IN_APPLY = {
exp,
ln,
log,
_log2,
_log10,
sin,
cos,
tan,
sinh,
cosh,
tanh,
ceil,
floor,
trunc,
sqrt,
snowflake_cortex_sentiment,
snowflake_cortex_summarize,
sp_func.exp,
sp_func.ln,
sp_func.log,
sp_func._log2,
sp_func._log10,
sp_func.sin,
sp_func.cos,
sp_func.tan,
sp_func.sinh,
sp_func.cosh,
sp_func.tanh,
sp_func.ceil,
sp_func.floor,
sp_func.trunc,
sp_func.sqrt,
sp_func.snowflake_cortex_summarize,
sp_func.snowflake_cortex_sentiment,
}


Expand Down Expand Up @@ -287,7 +262,7 @@ def end_partition(self, df): # type: ignore[no-untyped-def] # pragma: no cover
ApplyFunc.end_partition._sf_vectorized_input = native_pd.DataFrame # type: ignore[attr-defined]

packages = list(session.get_packages().values()) + udf_packages
func_udtf = udtf(
func_udtf = sp_func.udtf(
ApplyFunc,
output_schema=PandasDataFrameType(
[LongType(), StringType(), VariantType()],
Expand Down Expand Up @@ -709,7 +684,7 @@ def end_partition(self, df: native_pd.DataFrame): # type: ignore[no-untyped-def
excluded=existing_identifiers,
wrap_double_underscore=False,
)
return udtf(
return sp_func.udtf(
ApplyFunc,
output_schema=PandasDataFrameType(
[StringType(), IntegerType(), VariantType(), IntegerType(), IntegerType()],
Expand Down Expand Up @@ -783,7 +758,7 @@ def apply_func(x): # type: ignore[no-untyped-def] # pragma: no cover
def apply_func(x): # type: ignore[no-untyped-def] # pragma: no cover
return x.apply(func, args=args, **kwargs)

func_udf = udf(
func_udf = sp_func.udf(
apply_func,
return_type=PandasSeriesType(return_type),
input_types=[PandasSeriesType(input_type)],
Expand Down Expand Up @@ -1187,12 +1162,12 @@ def groupby_apply_pivot_result_to_final_ordered_dataframe(
# in GROUP_KEY_APPEARANCE_ORDER) and assign the
# label i to all rows that came from func(group_i).
[
col(original_row_position_snowflake_quoted_identifier).as_(
sp_func.col(original_row_position_snowflake_quoted_identifier).as_(
new_index_identifier
)
if sort_method is GroupbyApplySortMethod.ORIGINAL_ROW_ORDER
else (
dense_rank().over(
sp_func.dense_rank().over(
Window.order_by(
*(
SnowparkColumn(col).asc_nulls_last()
Expand All @@ -1213,9 +1188,11 @@ def groupby_apply_pivot_result_to_final_ordered_dataframe(
),
*[
(
col(old_quoted_identifier).as_(quoted_identifier)
sp_func.col(old_quoted_identifier).as_(quoted_identifier)
if return_variant
else col(old_quoted_identifier).cast(return_type).as_(quoted_identifier)
else sp_func.col(old_quoted_identifier)
.cast(return_type)
.as_(quoted_identifier)
)
for old_quoted_identifier, quoted_identifier in zip(
data_column_snowflake_quoted_identifiers
Expand Down Expand Up @@ -1400,7 +1377,7 @@ def groupby_apply_sort_method(
# Need to wrap column name in IDENTIFIER, or else bool agg function
# will treat the name as a string literal
is_transform: bool = not ordered_dataframe_before_sort.agg(
builtin("boolor_agg")(
sp_func.builtin("boolor_agg")(
SnowparkColumn(original_row_position_quoted_identifier) == -1
).as_("is_transform")
).collect()[0][0]
Expand Down Expand Up @@ -1475,7 +1452,7 @@ def make_condition(key: Any) -> SnowparkColumn:
# Cast one of the values in the comparison to variant so that we
# we can compare types that are otherwise not comparable in
# Snowflake, like timestamp and int.
return col.equal_null(to_variant(pandas_lit(key)))
return col.equal_null(sp_func.to_variant(pandas_lit(key)))

# If any of the values we are mapping to have types that are
# incompatible with the current column's type, we have to cast the new
Expand All @@ -1498,7 +1475,7 @@ def make_condition(key: Any) -> SnowparkColumn:
def make_result(value: Any) -> SnowparkColumn:
value_expression = pandas_lit(value)
return (
to_variant(value_expression)
sp_func.to_variant(value_expression)
if should_cast_result_to_variant
else value_expression
)
Expand All @@ -1510,7 +1487,7 @@ def make_result(value: Any) -> SnowparkColumn:
make_condition(key_and_value[0]), make_result(key_and_value[1])
),
itertools.islice(map_items, 1, None),
when(make_condition(first_key), make_result(first_value)),
sp_func.when(make_condition(first_key), make_result(first_value)),
)
if isinstance(mapping, defaultdict):
case_expression = case_expression.otherwise(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@
APPLY_LABEL_COLUMN_QUOTED_IDENTIFIER,
APPLY_VALUE_COLUMN_QUOTED_IDENTIFIER,
DEFAULT_UDTF_PARTITION_SIZE,
NUMPY_FUNCTION_TO_SNOWFLAKE_FUNCTION,
GroupbyApplySortMethod,
check_return_variant_and_get_return_type,
create_udf_for_series_apply,
Expand Down Expand Up @@ -8755,6 +8756,12 @@ def applymap(
f"Snowpark pandas applymap API doesn't yet support Snowpark Python function `{func.__name__}` with args = '{args}'."
)
return self._apply_snowpark_python_function_to_columns(func, kwargs)

# Check if the function is a known numpy function that can be translated to Snowflake function.
sf_func = NUMPY_FUNCTION_TO_SNOWFLAKE_FUNCTION.get(func)
if sf_func is not None:
return self._apply_snowpark_python_function_to_columns(sf_func, kwargs)

# Currently, NULL values are always passed into the udtf even if strict=True,
# which is a bug on the server side SNOW-880105.
# The fix will not land soon, so we are going to raise not implemented error for now.
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/modin/frame/test_applymap.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def test_applymap_numpy(func):
native_df = native_pd.DataFrame(data)
snow_df = pd.DataFrame(data)

with SqlCounter(query_count=7, udf_count=1):
with SqlCounter(query_count=1):
eval_snowpark_pandas_result(snow_df, native_df, lambda x: x.applymap(func))


Expand Down
4 changes: 2 additions & 2 deletions tests/integ/modin/series/test_apply_and_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def create_func_with_return_type_hint(func: Callable, return_type: str) -> Calla
return d["f"]


TEST_NUMPY_FUNCS = [np.min, np.sqrt, np.tan, np.sum, np.median]
TEST_NUMPY_FUNCS = [np.min, np.sqrt, np.tan, np.sum, np.square, np.log1p, np.exp2]


@pytest.mark.parametrize("method", ["apply", "map"])
Expand Down Expand Up @@ -412,7 +412,7 @@ def test_builtin_function(self, method, func):
)

@pytest.mark.parametrize("func", TEST_NUMPY_FUNCS)
@sql_count_checker(query_count=4, udf_count=1)
@sql_count_checker(query_count=1)
def test_apply_and_map_numpy(self, method, func):
data = [1.0, 2.0, 3.0]
native_series = native_pd.Series(data)
Expand Down

0 comments on commit 346bcd4

Please sign in to comment.