diff --git a/CHANGELOG.md b/CHANGELOG.md index ff3e8aef4cf..0767d5d3a0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -91,6 +91,9 @@ - When calling `DataFrame.set_index`, or setting `DataFrame.index` or `Series.index`, with a new index that does not match the current length of the `Series`/`DataFrame` object, a `ValueError` is no longer raised. When the `Series`/`DataFrame` object is longer than the new index, the `Series`/`DataFrame`'s new index is filled with `NaN` values for the "extra" elements. When the `Series`/`DataFrame` object is shorter than the new index, the extra values in the new index are ignored—`Series` and `DataFrame` stay the same length `n`, and use only the first `n` values of the new index. +#### Improvements + +- Improve concat, join performance when operations are performed on series coming from the same dataframe by avoiding unnecessary joins. ## 1.21.0 (2024-08-19) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index 846f3c64079..457bd388f2b 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -320,12 +320,26 @@ def _create_internal_frame_with_join_or_align_result( ) index_column_types.extend(right.cached_index_column_snowpark_pandas_types) + # If the result ordering column has the same ordering columns as the original left ordering columns, + # that means the original left and right shares the same base, and no actual snowpark join is applied because + # the join is applied on the ordering column or align on the same column. + # This behavior is guaranteed by the align and join methods provided by the OrderingDataframe, when the + # snowpark join is actually applied, the result ordering column will be a combination of + # left.ordering_column and right.ordering_column, plus some assist column. For example, the ordering column + # of left join is left.ordering_column + right.ordering_column. + no_join_applied = ( + result_ordered_frame.ordering_columns == left.ordered_dataframe.ordering_columns + ) + if key_coalesce_config: coalesce_column_identifiers = [] coalesce_column_values = [] for origin_left_col, origin_right_col, coalesce_config in zip( left_on, right_on, key_coalesce_config ): + if coalesce_config == JoinKeyCoalesceConfig.NONE: + continue + coalesce_col_type = None origin_left_col_type = ( left.snowflake_quoted_identifier_to_snowpark_pandas_type[ @@ -337,44 +351,60 @@ def _create_internal_frame_with_join_or_align_result( origin_right_col ] ) - if coalesce_config == JoinKeyCoalesceConfig.NONE: - continue + left_col = result_helper.map_left_quoted_identifiers([origin_left_col])[0] right_col = result_helper.map_right_quoted_identifiers([origin_right_col])[ 0 ] - # Coalescing is only required for 'outer' or 'asof' joins or align. - # For 'inner' and 'left' join we use left join keys and for 'right' join we - # use right join keys. - # For 'left' and 'coalesce' align we use left join keys. - if how in ("asof", "outer"): - # Generate an expression equivalent of - # "COALESCE('left_col', 'right_col') as 'left_col'" - coalesce_column_identifier = ( - result_ordered_frame.generate_snowflake_quoted_identifiers( - pandas_labels=[ - extract_pandas_label_from_snowflake_quoted_identifier( - left_col - ) - ], - )[0] - ) - coalesce_column_identifiers.append(coalesce_column_identifier) - coalesce_column_values.append(coalesce(left_col, right_col)) - if origin_left_col_type == origin_right_col_type: - coalesce_col_type = origin_left_col_type - elif how == "right": - # No coalescing required for 'right' join. Simply use right join key - # as output column. - coalesce_column_identifier = right_col - coalesce_col_type = origin_right_col_type - elif how in ("inner", "left", "coalesce"): - # No coalescing required for 'left' or 'inner' join and for 'left' or - # 'coalesce' align. Simply use left join key as output column. + + if no_join_applied and origin_left_col == origin_right_col: + # if no join is applied, that means the result dataframe, left dataframe and right dataframe + # shares the same base dataframe. If the original left column and original right column are the + # same column, no coalesce is needed, and we always tries to keep the left column to stay align + # with the original dataframe as much as possible to increase the chance for optimization for + # later operations, especially when the later operations are applied with dfs coming from + # the ame dataframe. + # Keep left column can help stay aligned with the original dataframe is because when there are + # conflict between left and right, deduplication always happens at right. For example, when join + # or align left dataframe [col1, col2] and right dataframe [col1, col2], the result dataframe will + # have columns [col1, col2, col1_a12b, col2_de3b], where col1_a12b, col2_de3b are just alias of + # col1 and col2 in right dataframe. + coalesce_config = JoinKeyCoalesceConfig.LEFT coalesce_column_identifier = left_col coalesce_col_type = origin_left_col_type else: - raise AssertionError(f"Unsupported join/align type {how}") + # Coalescing is only required for 'outer' or 'asof' joins or align. + # For 'inner' and 'left' join we use left join keys and for 'right' join we + # use right join keys. + # For 'left' and 'coalesce' align we use left join keys. + if how in ("asof", "outer"): + # Generate an expression equivalent of + # "COALESCE('left_col', 'right_col') as 'left_col'" + coalesce_column_identifier = ( + result_ordered_frame.generate_snowflake_quoted_identifiers( + pandas_labels=[ + extract_pandas_label_from_snowflake_quoted_identifier( + left_col + ) + ], + )[0] + ) + coalesce_column_identifiers.append(coalesce_column_identifier) + coalesce_column_values.append(coalesce(left_col, right_col)) + if origin_left_col_type == origin_right_col_type: + coalesce_col_type = origin_left_col_type + elif how == "right": + # No coalescing required for 'right' join. Simply use right join key + # as output column. + coalesce_column_identifier = right_col + coalesce_col_type = origin_right_col_type + elif how in ("inner", "left", "coalesce"): + # No coalescing required for 'left' or 'inner' join and for 'left' or + # 'coalesce' align. Simply use left join key as output column. + coalesce_column_identifier = left_col + coalesce_col_type = origin_left_col_type + else: + raise AssertionError(f"Unsupported join/align type {how}") if coalesce_config == JoinKeyCoalesceConfig.RIGHT: # swap left_col and right_col @@ -1187,15 +1217,8 @@ def align( # NULL NULL 2 NULL 4 e 2 coalesce_key_config = None inherit_join_index = InheritJoinIndex.FROM_LEFT - # When it is `outer` align, we need to coalesce the align columns. However, if the - # ordering columns of aligned result is the same as the left frame, that means the - # join columns of left and right matches, then there is no need to coalesce the join - # keys, simply inherent from left gives the correct result. - # Retaining the original columns also helps avoid unnecessary join in later steps. - if ( - how == "outer" - and aligned_ordered_frame.ordering_columns != left.ordering_columns - ): + # When it is `outer` align, we need to coalesce the align columns. + if how == "outer": coalesce_key_config = [JoinKeyCoalesceConfig.LEFT] * len(left_on) inherit_join_index = InheritJoinIndex.FROM_BOTH ( diff --git a/tests/integ/modin/binary/test_binary_op.py b/tests/integ/modin/binary/test_binary_op.py index 9ae5db98369..cd036bcb045 100644 --- a/tests/integ/modin/binary/test_binary_op.py +++ b/tests/integ/modin/binary/test_binary_op.py @@ -2586,3 +2586,26 @@ def test_df_sub_series(): eval_snowpark_pandas_result( snow_df, native_df, lambda df: df.sub(df["two"], axis="index"), inplace=True ) + + +@sql_count_checker(query_count=2, join_count=0) +def test_binary_op_multi_series_from_same_df(): + native_df = native_pd.DataFrame( + { + "A": [1, 2, 3], + "B": [2, 3, 4], + "C": [4, 5, 6], + "D": [2, 2, 3], + }, + index=["a", "b", "c"], + ) + snow_df = pd.DataFrame(native_df) + # ensure performing more than one binary operation for series coming from same + # dataframe does not produce any join. + eval_snowpark_pandas_result( + snow_df, native_df, lambda df: df["A"] + df["B"] + df["C"] + ) + # perform binary operations in different orders + eval_snowpark_pandas_result( + snow_df, native_df, lambda df: (df["A"] + df["B"]) + (df["C"] + df["D"]) + ) diff --git a/tests/integ/modin/test_concat.py b/tests/integ/modin/test_concat.py index 7e11a3537af..c1366c22506 100644 --- a/tests/integ/modin/test_concat.py +++ b/tests/integ/modin/test_concat.py @@ -1063,3 +1063,40 @@ def test_concat_keys(): } snow_df = pd.concat(data.values(), axis=1, keys=data.keys()) assert_frame_equal(snow_df, native_df, check_dtype=False) + + +@sql_count_checker(query_count=4, join_count=0) +def test_concat_series_from_same_df(join): + num_cols = 4 + select_data = [f'{i} as "{i}"' for i in range(num_cols)] + query = f"select {', '.join(select_data)}" + + # concat today uses join_on_index to concat all series, we use + # read_snowflake here so that the default index is created and + # managed by snowpark pandas, which is the same as row position + # column. This creates a valid optimization scenario for join, where + # join performed on the same row_position column doesn't require + # actual join. + # This can not be done with pd.DataFrame constructor because the index + # and row position column is controlled by client side, which are + # different columns. + df = pd.read_snowflake(query) + + series = [df[col] for col in df.columns] + final_df = pd.concat(series, join=join, axis=1) + + assert_frame_equal(df, final_df) + + +@sql_count_checker(query_count=4, join_count=0) +def test_df_creation_from_series_from_same_df(): + num_cols = 6 + select_data = [f'{i} as "{i}"' for i in range(num_cols)] + query = f"select {', '.join(select_data)}" + + df = pd.read_snowflake(query) + + df_dict = {col: df[col] for col in df.columns} + final_df = pd.DataFrame(df_dict) + + assert_frame_equal(df, final_df) diff --git a/tests/unit/modin/test_join_utils.py b/tests/unit/modin/test_join_utils.py new file mode 100644 index 00000000000..031ab13bef9 --- /dev/null +++ b/tests/unit/modin/test_join_utils.py @@ -0,0 +1,97 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# + +from collections.abc import Hashable +from unittest import mock + +import pytest + +import snowflake.snowpark.modin.plugin # noqa: F401 +from snowflake.snowpark.modin.plugin._internal.frame import ( + OrderedDataFrame, + OrderingColumn, +) +from snowflake.snowpark.modin.plugin._internal.join_utils import ( + InheritJoinIndex, + JoinKeyCoalesceConfig, + _create_internal_frame_with_join_or_align_result, +) +from snowflake.snowpark.modin.plugin.compiler.snowflake_query_compiler import ( + InternalFrame, +) + + +def mock_internal_frame( + data_column_pandas_labels: list[Hashable], + data_column_pandas_index_names: list[Hashable], + data_column_snowflake_quoted_identifiers: list[str], + index_column_pandas_labels: list[Hashable], + index_column_snowflake_quoted_identifiers: list[str], +) -> InternalFrame: + ordered_dataframe = mock.create_autospec(OrderedDataFrame) + ordered_dataframe.projected_column_snowflake_quoted_identifiers = ( + data_column_snowflake_quoted_identifiers + + index_column_snowflake_quoted_identifiers + ) + ordered_dataframe.ordering_columns = [ + OrderingColumn(col) + for col in ordered_dataframe.projected_column_snowflake_quoted_identifiers + ] + internal_frame = InternalFrame.create( + ordered_dataframe=ordered_dataframe, + data_column_pandas_labels=data_column_pandas_labels, + data_column_pandas_index_names=data_column_pandas_index_names, + data_column_snowflake_quoted_identifiers=data_column_snowflake_quoted_identifiers, + index_column_pandas_labels=index_column_pandas_labels, + index_column_snowflake_quoted_identifiers=index_column_snowflake_quoted_identifiers, + data_column_types=[None] * len(data_column_pandas_labels), + index_column_types=[None] * len(index_column_pandas_labels), + ) + + return internal_frame + + +def test_create_internal_frame_with_result_using_invalid_methods(): + left_frame = mock_internal_frame( + data_column_pandas_labels=["a1", "b1"], + data_column_pandas_index_names=[None], + data_column_snowflake_quoted_identifiers=['"A1"', '"B1"'], + index_column_pandas_labels=["i1"], + index_column_snowflake_quoted_identifiers=['"I1"'], + ) + + right_frame = mock_internal_frame( + data_column_pandas_labels=["a2", "b2"], + data_column_pandas_index_names=[None], + data_column_snowflake_quoted_identifiers=['"A2"', '"B2"'], + index_column_pandas_labels=["i2"], + index_column_snowflake_quoted_identifiers=['"I2"'], + ) + + result_ordered_frame = mock.create_autospec(OrderedDataFrame) + result_ordered_frame.projected_column_snowflake_quoted_identifiers = [ + '"I1"', + '"A1"', + '"B1"', + '"I2"', + '"A2"', + '"B2"', + ] + result_ordered_frame._ordering_columns_tuple = [ + OrderingColumn('"I1"'), + OrderingColumn('"I2"'), + ] + + with pytest.raises(AssertionError, match="Unsupported join/align type invalid"): + _create_internal_frame_with_join_or_align_result( + result_ordered_frame=result_ordered_frame, + left=left_frame, + right=right_frame, + how="invalid", + left_on=['"I1"'], + right_on=['"I2"'], + sort=False, + key_coalesce_config=[JoinKeyCoalesceConfig.LEFT], + inherit_index=InheritJoinIndex.FROM_LEFT, + )