diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f519a4169a..5fca207f45f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ - Added support for `Series.str.pad`. - Added support for applying Snowpark Python function `snowflake_cortex_sentiment`. +#### Improvements +- Improve performance of `DataFrame.map`, `Series.apply` and `Series.map` methods by mapping numpy functions to snowpark functions if possible. ## 1.26.0 (2024-12-05) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/apply_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/apply_utils.py index c78f1c1a734..bbb8c9ad020 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/apply_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/apply_utils.py @@ -26,32 +26,7 @@ pandas_lit, is_compatible_snowpark_types, ) -from snowflake.snowpark.functions import ( - builtin, - col, - dense_rank, - ln, - log, - _log2, - _log10, - sin, - snowflake_cortex_sentiment, - snowflake_cortex_summarize, - udf, - to_variant, - when, - udtf, - exp, - cos, - tan, - sinh, - cosh, - tanh, - ceil, - floor, - trunc, - sqrt, -) +from snowflake.snowpark import functions as sp_func from snowflake.snowpark.modin.plugin._internal.frame import InternalFrame from snowflake.snowpark.modin.plugin._internal.ordered_dataframe import ( OrderedDataFrame, @@ -96,23 +71,23 @@ cloudpickle.register_pickle_by_value(sys.modules[__name__]) SUPPORTED_SNOWPARK_PYTHON_FUNCTIONS_IN_APPLY = { - exp, - ln, - log, - _log2, - _log10, - sin, - cos, - tan, - sinh, - cosh, - tanh, - ceil, - floor, - trunc, - sqrt, - snowflake_cortex_sentiment, - snowflake_cortex_summarize, + sp_func.exp, + sp_func.ln, + sp_func.log, + sp_func._log2, + sp_func._log10, + sp_func.sin, + sp_func.cos, + sp_func.tan, + sp_func.sinh, + sp_func.cosh, + sp_func.tanh, + sp_func.ceil, + sp_func.floor, + sp_func.trunc, + sp_func.sqrt, + sp_func.snowflake_cortex_summarize, + sp_func.snowflake_cortex_sentiment, } @@ -287,7 +262,7 @@ def end_partition(self, df): # type: ignore[no-untyped-def] # pragma: no cover ApplyFunc.end_partition._sf_vectorized_input = native_pd.DataFrame # type: ignore[attr-defined] packages = list(session.get_packages().values()) + udf_packages - func_udtf = udtf( + func_udtf = sp_func.udtf( ApplyFunc, output_schema=PandasDataFrameType( [LongType(), StringType(), VariantType()], @@ -709,7 +684,7 @@ def end_partition(self, df: native_pd.DataFrame): # type: ignore[no-untyped-def excluded=existing_identifiers, wrap_double_underscore=False, ) - return udtf( + return sp_func.udtf( ApplyFunc, output_schema=PandasDataFrameType( [StringType(), IntegerType(), VariantType(), IntegerType(), IntegerType()], @@ -783,7 +758,7 @@ def apply_func(x): # type: ignore[no-untyped-def] # pragma: no cover def apply_func(x): # type: ignore[no-untyped-def] # pragma: no cover return x.apply(func, args=args, **kwargs) - func_udf = udf( + func_udf = sp_func.udf( apply_func, return_type=PandasSeriesType(return_type), input_types=[PandasSeriesType(input_type)], @@ -1187,12 +1162,12 @@ def groupby_apply_pivot_result_to_final_ordered_dataframe( # in GROUP_KEY_APPEARANCE_ORDER) and assign the # label i to all rows that came from func(group_i). [ - col(original_row_position_snowflake_quoted_identifier).as_( + sp_func.col(original_row_position_snowflake_quoted_identifier).as_( new_index_identifier ) if sort_method is GroupbyApplySortMethod.ORIGINAL_ROW_ORDER else ( - dense_rank().over( + sp_func.dense_rank().over( Window.order_by( *( SnowparkColumn(col).asc_nulls_last() @@ -1213,9 +1188,11 @@ def groupby_apply_pivot_result_to_final_ordered_dataframe( ), *[ ( - col(old_quoted_identifier).as_(quoted_identifier) + sp_func.col(old_quoted_identifier).as_(quoted_identifier) if return_variant - else col(old_quoted_identifier).cast(return_type).as_(quoted_identifier) + else sp_func.col(old_quoted_identifier) + .cast(return_type) + .as_(quoted_identifier) ) for old_quoted_identifier, quoted_identifier in zip( data_column_snowflake_quoted_identifiers @@ -1400,7 +1377,7 @@ def groupby_apply_sort_method( # Need to wrap column name in IDENTIFIER, or else bool agg function # will treat the name as a string literal is_transform: bool = not ordered_dataframe_before_sort.agg( - builtin("boolor_agg")( + sp_func.builtin("boolor_agg")( SnowparkColumn(original_row_position_quoted_identifier) == -1 ).as_("is_transform") ).collect()[0][0] @@ -1475,7 +1452,7 @@ def make_condition(key: Any) -> SnowparkColumn: # Cast one of the values in the comparison to variant so that we # we can compare types that are otherwise not comparable in # Snowflake, like timestamp and int. - return col.equal_null(to_variant(pandas_lit(key))) + return col.equal_null(sp_func.to_variant(pandas_lit(key))) # If any of the values we are mapping to have types that are # incompatible with the current column's type, we have to cast the new @@ -1498,7 +1475,7 @@ def make_condition(key: Any) -> SnowparkColumn: def make_result(value: Any) -> SnowparkColumn: value_expression = pandas_lit(value) return ( - to_variant(value_expression) + sp_func.to_variant(value_expression) if should_cast_result_to_variant else value_expression ) @@ -1510,7 +1487,7 @@ def make_result(value: Any) -> SnowparkColumn: make_condition(key_and_value[0]), make_result(key_and_value[1]) ), itertools.islice(map_items, 1, None), - when(make_condition(first_key), make_result(first_value)), + sp_func.when(make_condition(first_key), make_result(first_value)), ) if isinstance(mapping, defaultdict): case_expression = case_expression.otherwise( diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index 38ecd99b80a..5849dba0745 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -372,6 +372,9 @@ from snowflake.snowpark.modin.plugin.utils.error_message import ErrorMessage from snowflake.snowpark.modin.plugin.utils.warning_message import WarningMessage from snowflake.snowpark.modin.utils import MODIN_UNNAMED_SERIES_LABEL +from snowflake.snowpark.modin.plugin.utils.numpy_to_pandas import ( + NUMPY_FUNCTION_TO_SNOWFLAKE_FUNCTION, +) from snowflake.snowpark.session import Session from snowflake.snowpark.types import ( ArrayType, @@ -8755,6 +8758,15 @@ def applymap( f"Snowpark pandas applymap API doesn't yet support Snowpark Python function `{func.__name__}` with args = '{args}'." ) return self._apply_snowpark_python_function_to_columns(func, kwargs) + + # Check if the function is a known numpy function that can be translated to Snowflake function. + sf_func = NUMPY_FUNCTION_TO_SNOWFLAKE_FUNCTION.get(func) + if sf_func is not None: + # TODO SNOW-1739034: remove pragma no cover when apply tests are enabled in CI + return self._apply_snowpark_python_function_to_columns( + sf_func, kwargs + ) # pragma: no cover + # Currently, NULL values are always passed into the udtf even if strict=True, # which is a bug on the server side SNOW-880105. # The fix will not land soon, so we are going to raise not implemented error for now. diff --git a/src/snowflake/snowpark/modin/plugin/utils/numpy_to_pandas.py b/src/snowflake/snowpark/modin/plugin/utils/numpy_to_pandas.py index ce8c02b7692..8faa9cdb350 100644 --- a/src/snowflake/snowpark/modin/plugin/utils/numpy_to_pandas.py +++ b/src/snowflake/snowpark/modin/plugin/utils/numpy_to_pandas.py @@ -6,6 +6,7 @@ import modin.pandas as pd from modin.pandas.base import BasePandasDataset from modin.pandas.utils import is_scalar +import numpy as np from snowflake.snowpark import functions as sp_func from snowflake.snowpark.modin.plugin.utils.warning_message import WarningMessage @@ -288,3 +289,48 @@ def map_to_bools(inputs: Any) -> Any: sp_func.trunc ), # df.truncate not supported in snowpandas yet } + + +NUMPY_FUNCTION_TO_SNOWFLAKE_FUNCTION = { + # Math operations + np.absolute: sp_func.abs, + np.sign: sp_func.sign, + np.negative: sp_func.negate, + np.positive: lambda col: col, + np.sqrt: sp_func.sqrt, + np.square: lambda col: sp_func.builtin("square")(col), + np.cbrt: lambda col: sp_func.builtin("cbrt")(col), + np.reciprocal: lambda col: 1 / col, + np.exp: sp_func.exp, + np.exp2: lambda col: sp_func.pow(2, col), + np.expm1: lambda col: sp_func.exp(col) - 1, + np.log: sp_func.ln, + np.log2: sp_func._log2, + np.log10: sp_func._log10, + np.log1p: lambda col: sp_func.ln(col + 1), + # Aggregate functions translate to identity functions when applied element wise + np.sum: lambda col: col, + np.min: lambda col: col, + np.max: lambda col: col, + # Trigonometric functions + np.sin: sp_func.sin, + np.cos: sp_func.cos, + np.tan: sp_func.tan, + np.sinh: sp_func.sinh, + np.cosh: sp_func.cosh, + np.tanh: sp_func.tanh, + np.arcsin: lambda col: sp_func.builtin("asin")(col), + np.arccos: lambda col: sp_func.builtin("acos")(col), + np.arctan: lambda col: sp_func.builtin("atan")(col), + np.arctan2: lambda col: sp_func.builtin("atan2")(col), + np.arcsinh: lambda col: sp_func.builtin("asinh")(col), + np.arccosh: lambda col: sp_func.builtin("acosh")(col), + np.arctanh: lambda col: sp_func.builtin("atanh")(col), + np.degrees: lambda col: sp_func.builtin("degrees")(col), + np.radians: lambda col: sp_func.builtin("radians")(col), + # Floating functions + np.ceil: sp_func.ceil, + np.floor: sp_func.floor, + np.trunc: sp_func.trunc, + np.isnan: sp_func.is_null, +} diff --git a/tests/integ/modin/frame/test_applymap.py b/tests/integ/modin/frame/test_applymap.py index 517b5ce12e8..3e7f06af70e 100644 --- a/tests/integ/modin/frame/test_applymap.py +++ b/tests/integ/modin/frame/test_applymap.py @@ -107,7 +107,7 @@ def test_applymap_numpy(func): native_df = native_pd.DataFrame(data) snow_df = pd.DataFrame(data) - with SqlCounter(query_count=7, udf_count=1): + with SqlCounter(query_count=1): eval_snowpark_pandas_result(snow_df, native_df, lambda x: x.applymap(func)) diff --git a/tests/integ/modin/series/test_apply_and_map.py b/tests/integ/modin/series/test_apply_and_map.py index f776863fa6e..81a64bee99a 100644 --- a/tests/integ/modin/series/test_apply_and_map.py +++ b/tests/integ/modin/series/test_apply_and_map.py @@ -169,7 +169,7 @@ def create_func_with_return_type_hint(func: Callable, return_type: str) -> Calla return d["f"] -TEST_NUMPY_FUNCS = [np.min, np.sqrt, np.tan, np.sum, np.median] +TEST_NUMPY_FUNCS = [np.min, np.sqrt, np.tan, np.sum, np.square, np.log1p, np.exp2] @pytest.mark.parametrize("method", ["apply", "map"]) @@ -412,7 +412,7 @@ def test_builtin_function(self, method, func): ) @pytest.mark.parametrize("func", TEST_NUMPY_FUNCS) - @sql_count_checker(query_count=4, udf_count=1) + @sql_count_checker(query_count=1) def test_apply_and_map_numpy(self, method, func): data = [1.0, 2.0, 3.0] native_series = native_pd.Series(data)