diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f519a4169a..5fca207f45f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ - Added support for `Series.str.pad`. - Added support for applying Snowpark Python function `snowflake_cortex_sentiment`. +#### Improvements +- Improve performance of `DataFrame.map`, `Series.apply` and `Series.map` methods by mapping numpy functions to snowpark functions if possible. ## 1.26.0 (2024-12-05) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/apply_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/apply_utils.py index c78f1c1a734..bbb8c9ad020 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/apply_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/apply_utils.py @@ -26,32 +26,7 @@ pandas_lit, is_compatible_snowpark_types, ) -from snowflake.snowpark.functions import ( - builtin, - col, - dense_rank, - ln, - log, - _log2, - _log10, - sin, - snowflake_cortex_sentiment, - snowflake_cortex_summarize, - udf, - to_variant, - when, - udtf, - exp, - cos, - tan, - sinh, - cosh, - tanh, - ceil, - floor, - trunc, - sqrt, -) +from snowflake.snowpark import functions as sp_func from snowflake.snowpark.modin.plugin._internal.frame import InternalFrame from snowflake.snowpark.modin.plugin._internal.ordered_dataframe import ( OrderedDataFrame, @@ -96,23 +71,23 @@ cloudpickle.register_pickle_by_value(sys.modules[__name__]) SUPPORTED_SNOWPARK_PYTHON_FUNCTIONS_IN_APPLY = { - exp, - ln, - log, - _log2, - _log10, - sin, - cos, - tan, - sinh, - cosh, - tanh, - ceil, - floor, - trunc, - sqrt, - snowflake_cortex_sentiment, - snowflake_cortex_summarize, + sp_func.exp, + sp_func.ln, + sp_func.log, + sp_func._log2, + sp_func._log10, + sp_func.sin, + sp_func.cos, + sp_func.tan, + sp_func.sinh, + sp_func.cosh, + sp_func.tanh, + sp_func.ceil, + sp_func.floor, + sp_func.trunc, + sp_func.sqrt, + sp_func.snowflake_cortex_summarize, + sp_func.snowflake_cortex_sentiment, } @@ -287,7 +262,7 @@ def end_partition(self, df): # type: ignore[no-untyped-def] # pragma: no cover ApplyFunc.end_partition._sf_vectorized_input = native_pd.DataFrame # type: ignore[attr-defined] packages = list(session.get_packages().values()) + udf_packages - func_udtf = udtf( + func_udtf = sp_func.udtf( ApplyFunc, output_schema=PandasDataFrameType( [LongType(), StringType(), VariantType()], @@ -709,7 +684,7 @@ def end_partition(self, df: native_pd.DataFrame): # type: ignore[no-untyped-def excluded=existing_identifiers, wrap_double_underscore=False, ) - return udtf( + return sp_func.udtf( ApplyFunc, output_schema=PandasDataFrameType( [StringType(), IntegerType(), VariantType(), IntegerType(), IntegerType()], @@ -783,7 +758,7 @@ def apply_func(x): # type: ignore[no-untyped-def] # pragma: no cover def apply_func(x): # type: ignore[no-untyped-def] # pragma: no cover return x.apply(func, args=args, **kwargs) - func_udf = udf( + func_udf = sp_func.udf( apply_func, return_type=PandasSeriesType(return_type), input_types=[PandasSeriesType(input_type)], @@ -1187,12 +1162,12 @@ def groupby_apply_pivot_result_to_final_ordered_dataframe( # in GROUP_KEY_APPEARANCE_ORDER) and assign the # label i to all rows that came from func(group_i). [ - col(original_row_position_snowflake_quoted_identifier).as_( + sp_func.col(original_row_position_snowflake_quoted_identifier).as_( new_index_identifier ) if sort_method is GroupbyApplySortMethod.ORIGINAL_ROW_ORDER else ( - dense_rank().over( + sp_func.dense_rank().over( Window.order_by( *( SnowparkColumn(col).asc_nulls_last() @@ -1213,9 +1188,11 @@ def groupby_apply_pivot_result_to_final_ordered_dataframe( ), *[ ( - col(old_quoted_identifier).as_(quoted_identifier) + sp_func.col(old_quoted_identifier).as_(quoted_identifier) if return_variant - else col(old_quoted_identifier).cast(return_type).as_(quoted_identifier) + else sp_func.col(old_quoted_identifier) + .cast(return_type) + .as_(quoted_identifier) ) for old_quoted_identifier, quoted_identifier in zip( data_column_snowflake_quoted_identifiers @@ -1400,7 +1377,7 @@ def groupby_apply_sort_method( # Need to wrap column name in IDENTIFIER, or else bool agg function # will treat the name as a string literal is_transform: bool = not ordered_dataframe_before_sort.agg( - builtin("boolor_agg")( + sp_func.builtin("boolor_agg")( SnowparkColumn(original_row_position_quoted_identifier) == -1 ).as_("is_transform") ).collect()[0][0] @@ -1475,7 +1452,7 @@ def make_condition(key: Any) -> SnowparkColumn: # Cast one of the values in the comparison to variant so that we # we can compare types that are otherwise not comparable in # Snowflake, like timestamp and int. - return col.equal_null(to_variant(pandas_lit(key))) + return col.equal_null(sp_func.to_variant(pandas_lit(key))) # If any of the values we are mapping to have types that are # incompatible with the current column's type, we have to cast the new @@ -1498,7 +1475,7 @@ def make_condition(key: Any) -> SnowparkColumn: def make_result(value: Any) -> SnowparkColumn: value_expression = pandas_lit(value) return ( - to_variant(value_expression) + sp_func.to_variant(value_expression) if should_cast_result_to_variant else value_expression ) @@ -1510,7 +1487,7 @@ def make_result(value: Any) -> SnowparkColumn: make_condition(key_and_value[0]), make_result(key_and_value[1]) ), itertools.islice(map_items, 1, None), - when(make_condition(first_key), make_result(first_value)), + sp_func.when(make_condition(first_key), make_result(first_value)), ) if isinstance(mapping, defaultdict): case_expression = case_expression.otherwise( diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index 38ecd99b80a..389c73e0287 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -185,6 +185,7 @@ APPLY_LABEL_COLUMN_QUOTED_IDENTIFIER, APPLY_VALUE_COLUMN_QUOTED_IDENTIFIER, DEFAULT_UDTF_PARTITION_SIZE, + NUMPY_FUNCTION_TO_SNOWFLAKE_FUNCTION, GroupbyApplySortMethod, check_return_variant_and_get_return_type, create_udf_for_series_apply, @@ -8755,6 +8756,12 @@ def applymap( f"Snowpark pandas applymap API doesn't yet support Snowpark Python function `{func.__name__}` with args = '{args}'." ) return self._apply_snowpark_python_function_to_columns(func, kwargs) + + # Check if the function is a known numpy function that can be translated to Snowflake function. + sf_func = NUMPY_FUNCTION_TO_SNOWFLAKE_FUNCTION.get(func) + if sf_func is not None: + return self._apply_snowpark_python_function_to_columns(sf_func, kwargs) + # Currently, NULL values are always passed into the udtf even if strict=True, # which is a bug on the server side SNOW-880105. # The fix will not land soon, so we are going to raise not implemented error for now. diff --git a/tests/integ/modin/frame/test_applymap.py b/tests/integ/modin/frame/test_applymap.py index 517b5ce12e8..3e7f06af70e 100644 --- a/tests/integ/modin/frame/test_applymap.py +++ b/tests/integ/modin/frame/test_applymap.py @@ -107,7 +107,7 @@ def test_applymap_numpy(func): native_df = native_pd.DataFrame(data) snow_df = pd.DataFrame(data) - with SqlCounter(query_count=7, udf_count=1): + with SqlCounter(query_count=1): eval_snowpark_pandas_result(snow_df, native_df, lambda x: x.applymap(func)) diff --git a/tests/integ/modin/series/test_apply_and_map.py b/tests/integ/modin/series/test_apply_and_map.py index f776863fa6e..81a64bee99a 100644 --- a/tests/integ/modin/series/test_apply_and_map.py +++ b/tests/integ/modin/series/test_apply_and_map.py @@ -169,7 +169,7 @@ def create_func_with_return_type_hint(func: Callable, return_type: str) -> Calla return d["f"] -TEST_NUMPY_FUNCS = [np.min, np.sqrt, np.tan, np.sum, np.median] +TEST_NUMPY_FUNCS = [np.min, np.sqrt, np.tan, np.sum, np.square, np.log1p, np.exp2] @pytest.mark.parametrize("method", ["apply", "map"]) @@ -412,7 +412,7 @@ def test_builtin_function(self, method, func): ) @pytest.mark.parametrize("func", TEST_NUMPY_FUNCS) - @sql_count_checker(query_count=4, udf_count=1) + @sql_count_checker(query_count=1) def test_apply_and_map_numpy(self, method, func): data = [1.0, 2.0, 3.0] native_series = native_pd.Series(data)