From febf1e153787367465af59a378cb1929d5f365ce Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Mon, 26 Aug 2024 11:13:08 -0700 Subject: [PATCH 01/15] improve join/align performance --- .../modin/plugin/_internal/join_utils.py | 74 +++++++++++-------- tests/integ/modin/test_concat.py | 28 +++++++ 2 files changed, 70 insertions(+), 32 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index 846f3c64079..ab1066c3151 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -343,42 +343,52 @@ def _create_internal_frame_with_join_or_align_result( right_col = result_helper.map_right_quoted_identifiers([origin_right_col])[ 0 ] - # Coalescing is only required for 'outer' or 'asof' joins or align. - # For 'inner' and 'left' join we use left join keys and for 'right' join we - # use right join keys. - # For 'left' and 'coalesce' align we use left join keys. - if how in ("asof", "outer"): - # Generate an expression equivalent of - # "COALESCE('left_col', 'right_col') as 'left_col'" - coalesce_column_identifier = ( - result_ordered_frame.generate_snowflake_quoted_identifiers( - pandas_labels=[ - extract_pandas_label_from_snowflake_quoted_identifier( - left_col - ) - ], - )[0] - ) - coalesce_column_identifiers.append(coalesce_column_identifier) - coalesce_column_values.append(coalesce(left_col, right_col)) - if origin_left_col_type == origin_right_col_type: - coalesce_col_type = origin_left_col_type - elif how == "right": - # No coalescing required for 'right' join. Simply use right join key - # as output column. - coalesce_column_identifier = right_col - coalesce_col_type = origin_right_col_type - elif how in ("inner", "left", "coalesce"): - # No coalescing required for 'left' or 'inner' join and for 'left' or - # 'coalesce' align. Simply use left join key as output column. + # if the result ordering column has the same ordering columns as the original left ordering columns, + # that means the original left and right shares the same base, and no actual join is applied. + no_join_applied = ( + result_ordered_frame.ordering_columns + == left.ordered_dataframe.ordering_columns + ) + if no_join_applied and origin_left_col == origin_right_col: coalesce_column_identifier = left_col coalesce_col_type = origin_left_col_type else: - raise AssertionError(f"Unsupported join/align type {how}") + # Coalescing is only required for 'outer' or 'asof' joins or align. + # For 'inner' and 'left' join we use left join keys and for 'right' join we + # use right join keys. + # For 'left' and 'coalesce' align we use left join keys. + if how in ("asof", "outer"): + # Generate an expression equivalent of + # "COALESCE('left_col', 'right_col') as 'left_col'" + coalesce_column_identifier = ( + result_ordered_frame.generate_snowflake_quoted_identifiers( + pandas_labels=[ + extract_pandas_label_from_snowflake_quoted_identifier( + left_col + ) + ], + )[0] + ) + coalesce_column_identifiers.append(coalesce_column_identifier) + coalesce_column_values.append(coalesce(left_col, right_col)) + if origin_left_col_type == origin_right_col_type: + coalesce_col_type = origin_left_col_type + elif how == "right": + # No coalescing required for 'right' join. Simply use right join key + # as output column. + coalesce_column_identifier = right_col + coalesce_col_type = origin_right_col_type + elif how in ("inner", "left", "coalesce"): + # No coalescing required for 'left' or 'inner' join and for 'left' or + # 'coalesce' align. Simply use left join key as output column. + coalesce_column_identifier = left_col + coalesce_col_type = origin_left_col_type + else: + raise AssertionError(f"Unsupported join/align type {how}") - if coalesce_config == JoinKeyCoalesceConfig.RIGHT: - # swap left_col and right_col - left_col, right_col = right_col, left_col + if coalesce_config == JoinKeyCoalesceConfig.RIGHT: + # swap left_col and right_col + left_col, right_col = right_col, left_col # To provide same behavior as native pandas, remove duplicate join column. if right_col in data_column_snowflake_quoted_identifiers: diff --git a/tests/integ/modin/test_concat.py b/tests/integ/modin/test_concat.py index 7e11a3537af..cb5d224d186 100644 --- a/tests/integ/modin/test_concat.py +++ b/tests/integ/modin/test_concat.py @@ -1063,3 +1063,31 @@ def test_concat_keys(): } snow_df = pd.concat(data.values(), axis=1, keys=data.keys()) assert_frame_equal(snow_df, native_df, check_dtype=False) + + +@sql_count_checker(query_count=4, join_count=0) +def test_concat_series_from_same_df(join): + num_cols = 4 + select_data = [f'{i} as "{i}"' for i in range(num_cols)] + query = f"select {', '.join(select_data)}" + + df = pd.read_snowflake(query) + + series = [df[col] for col in df.columns] + final_df = pd.concat(series, join=join, axis=1) + + assert_frame_equal(df, final_df) + + +@sql_count_checker(query_count=4, join_count=0) +def test_df_creation_from_series_from_same_df(): + num_cols = 6 + select_data = [f'{i} as "{i}"' for i in range(num_cols)] + query = f"select {', '.join(select_data)}" + + df = pd.read_snowflake(query) + + df_dict = {col: df[col] for col in df.columns} + final_df = pd.DataFrame(df_dict) + + assert_frame_equal(df, final_df) From ac002c187eaabc0a1157b3bcc0b1c9f7177849cc Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Mon, 26 Aug 2024 11:22:07 -0700 Subject: [PATCH 02/15] add comment --- .../snowpark/modin/plugin/_internal/join_utils.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index ab1066c3151..20f2e678239 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -350,6 +350,16 @@ def _create_internal_frame_with_join_or_align_result( == left.ordered_dataframe.ordering_columns ) if no_join_applied and origin_left_col == origin_right_col: + # if no join is applied, that means the result dataframe, left dataframe and right dataframe + # shares the same base dataframe. If the original left column and original right column are the + # same column, we always tries to keep the left column to stay align with the original dataframe + # as much as possible to increase the chance for optimization for later operations, especially + # when the later operations are applied with dfs coming from the ame dataframe. + # Keep left column can help stay aligned with the original dataframe is because when there are + # conflict between left and right, deduplication always happens at right. For example, when join + # or align left dataframe [col1, col2] and right dataframe [col1, col2], the result dataframe will + # have columns [col1, col2, col1_a12b, col2_de3b], where col1_a12b, col2_de3b are just alias of + # col1 and col2 in right dataframe. coalesce_column_identifier = left_col coalesce_col_type = origin_left_col_type else: From 7a1191cb0d4e71d5e5c72c1f6c979a8e3cd15d90 Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Tue, 27 Aug 2024 11:38:39 -0700 Subject: [PATCH 03/15] fix error --- tests/integ/modin/binary/test_binary_op.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/integ/modin/binary/test_binary_op.py b/tests/integ/modin/binary/test_binary_op.py index 9ae5db98369..9617cffdd8e 100644 --- a/tests/integ/modin/binary/test_binary_op.py +++ b/tests/integ/modin/binary/test_binary_op.py @@ -2586,3 +2586,6 @@ def test_df_sub_series(): eval_snowpark_pandas_result( snow_df, native_df, lambda df: df.sub(df["two"], axis="index"), inplace=True ) + + +# def test_binary_op_series_from_same_df(): \ No newline at end of file From a7f7dd24292fc317a323d444f142802cd0ff97cc Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Tue, 27 Aug 2024 13:09:18 -0700 Subject: [PATCH 04/15] add test --- tests/integ/modin/binary/test_binary_op.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/tests/integ/modin/binary/test_binary_op.py b/tests/integ/modin/binary/test_binary_op.py index 9617cffdd8e..25e6b84844c 100644 --- a/tests/integ/modin/binary/test_binary_op.py +++ b/tests/integ/modin/binary/test_binary_op.py @@ -2588,4 +2588,17 @@ def test_df_sub_series(): ) -# def test_binary_op_series_from_same_df(): \ No newline at end of file +@sql_count_checker(query_count=1, join_count=0) +def test_binary_op_series_from_same_df(): + native_df = native_pd.DataFrame( + { + "A": [1, 2, 3], + "B": [2, 3, 4], + "C": [4, 5, 6], + }, + index=["a", "b", "c"], + ) + snow_df = pd.DataFrame(native_df) + eval_snowpark_pandas_result( + snow_df, native_df, lambda df: df["A"] + df["B"] + df["C"] + ) From a31fbc862ef989a8fee8571f3e7750c344f1db75 Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Tue, 27 Aug 2024 13:13:18 -0700 Subject: [PATCH 05/15] add changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ff3e8aef4cf..4b64c74ced2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -143,6 +143,7 @@ #### Improvements - Fix pandas FutureWarning about integer indexing. +- Improve concat, join performance when operations are performed on series coming from the same dataframe. ### Snowpark pandas API Updates From 8ec52380228fa984ff62f5c3158ba189861f330f Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Thu, 29 Aug 2024 19:24:55 -0700 Subject: [PATCH 06/15] fix error --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b64c74ced2..0767d5d3a0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -91,6 +91,9 @@ - When calling `DataFrame.set_index`, or setting `DataFrame.index` or `Series.index`, with a new index that does not match the current length of the `Series`/`DataFrame` object, a `ValueError` is no longer raised. When the `Series`/`DataFrame` object is longer than the new index, the `Series`/`DataFrame`'s new index is filled with `NaN` values for the "extra" elements. When the `Series`/`DataFrame` object is shorter than the new index, the extra values in the new index are ignored—`Series` and `DataFrame` stay the same length `n`, and use only the first `n` values of the new index. +#### Improvements + +- Improve concat, join performance when operations are performed on series coming from the same dataframe by avoiding unnecessary joins. ## 1.21.0 (2024-08-19) @@ -143,7 +146,6 @@ #### Improvements - Fix pandas FutureWarning about integer indexing. -- Improve concat, join performance when operations are performed on series coming from the same dataframe. ### Snowpark pandas API Updates From 8b2287100a61092bc5b99d1a3fa4ce700c60fc52 Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Wed, 28 Aug 2024 16:43:41 -0700 Subject: [PATCH 07/15] revert change --- src/snowflake/snowpark/modin/plugin/_internal/join_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index 20f2e678239..aa13eefe7c4 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -1214,7 +1214,7 @@ def align( # Retaining the original columns also helps avoid unnecessary join in later steps. if ( how == "outer" - and aligned_ordered_frame.ordering_columns != left.ordering_columns + # and aligned_ordered_frame.ordering_columns != left.ordering_columns ): coalesce_key_config = [JoinKeyCoalesceConfig.LEFT] * len(left_on) inherit_join_index = InheritJoinIndex.FROM_BOTH From 530b6dce3a65928e35bc153d85e54b2cba38557a Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Wed, 28 Aug 2024 18:25:25 -0700 Subject: [PATCH 08/15] testing --- src/snowflake/snowpark/modin/plugin/_internal/join_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index aa13eefe7c4..d52b4036772 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -1213,7 +1213,8 @@ def align( # keys, simply inherent from left gives the correct result. # Retaining the original columns also helps avoid unnecessary join in later steps. if ( - how == "outer" + how + == "outer" # and aligned_ordered_frame.ordering_columns != left.ordering_columns ): coalesce_key_config = [JoinKeyCoalesceConfig.LEFT] * len(left_on) From ec78e7f4ca2f333966f725ec9cf4aeaba23b436f Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Wed, 28 Aug 2024 18:27:03 -0700 Subject: [PATCH 09/15] fix error --- .../snowpark/modin/plugin/_internal/join_utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index d52b4036772..aca89d4c0f7 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -1212,11 +1212,11 @@ def align( # join columns of left and right matches, then there is no need to coalesce the join # keys, simply inherent from left gives the correct result. # Retaining the original columns also helps avoid unnecessary join in later steps. - if ( - how - == "outer" + # if ( + # how == "outer" # and aligned_ordered_frame.ordering_columns != left.ordering_columns - ): + #): + if how == "outer": coalesce_key_config = [JoinKeyCoalesceConfig.LEFT] * len(left_on) inherit_join_index = InheritJoinIndex.FROM_BOTH ( From ba1610194226d9e7c959126ecf77860e26c5067f Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Wed, 28 Aug 2024 18:28:02 -0700 Subject: [PATCH 10/15] fix error --- src/snowflake/snowpark/modin/plugin/_internal/join_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index aca89d4c0f7..2e76dc60fe5 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -1214,8 +1214,8 @@ def align( # Retaining the original columns also helps avoid unnecessary join in later steps. # if ( # how == "outer" - # and aligned_ordered_frame.ordering_columns != left.ordering_columns - #): + # and aligned_ordered_frame.ordering_columns != left.ordering_columns + # ): if how == "outer": coalesce_key_config = [JoinKeyCoalesceConfig.LEFT] * len(left_on) inherit_join_index = InheritJoinIndex.FROM_BOTH From 063f4b7bbd3ac7ac997207b2632440fd764cc485 Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Thu, 29 Aug 2024 10:27:42 -0700 Subject: [PATCH 11/15] address feedback --- .../modin/plugin/_internal/join_utils.py | 130 +++++++++--------- 1 file changed, 64 insertions(+), 66 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index 2e76dc60fe5..4ba6bdd7653 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -320,85 +320,91 @@ def _create_internal_frame_with_join_or_align_result( ) index_column_types.extend(right.cached_index_column_snowpark_pandas_types) + # If the result ordering column has the same ordering columns as the original left ordering columns, + # that means the original left and right shares the same base, and no actual snowpark join is applied because + # the join is applied on the ordering column or align on the same column. + # This behavior is guaranteed by the align and join methods provided by the OrderingDataframe, when the + # snowpark join is actually applied, the result ordering column will be a combination of + # left.ordering_column and right.ordering_column, plus some assist column. For example, the ordering column + # of left join is left.ordering_column + right.ordering_column. + no_join_applied = ( + result_ordered_frame.ordering_columns == left.ordered_dataframe.ordering_columns + ) + if key_coalesce_config: coalesce_column_identifiers = [] coalesce_column_values = [] for origin_left_col, origin_right_col, coalesce_config in zip( left_on, right_on, key_coalesce_config ): - coalesce_col_type = None - origin_left_col_type = ( - left.snowflake_quoted_identifier_to_snowpark_pandas_type[ - origin_left_col - ] - ) - origin_right_col_type = ( - right.snowflake_quoted_identifier_to_snowpark_pandas_type[ - origin_right_col - ] - ) - if coalesce_config == JoinKeyCoalesceConfig.NONE: - continue left_col = result_helper.map_left_quoted_identifiers([origin_left_col])[0] right_col = result_helper.map_right_quoted_identifiers([origin_right_col])[ 0 ] - # if the result ordering column has the same ordering columns as the original left ordering columns, - # that means the original left and right shares the same base, and no actual join is applied. - no_join_applied = ( - result_ordered_frame.ordering_columns - == left.ordered_dataframe.ordering_columns - ) if no_join_applied and origin_left_col == origin_right_col: # if no join is applied, that means the result dataframe, left dataframe and right dataframe # shares the same base dataframe. If the original left column and original right column are the - # same column, we always tries to keep the left column to stay align with the original dataframe - # as much as possible to increase the chance for optimization for later operations, especially - # when the later operations are applied with dfs coming from the ame dataframe. + # same column, no coalesce is needed, and we always tries to keep the left column to stay align + # with the original dataframe as much as possible to increase the chance for optimization for + # later operations, especially when the later operations are applied with dfs coming from + # the ame dataframe. # Keep left column can help stay aligned with the original dataframe is because when there are # conflict between left and right, deduplication always happens at right. For example, when join # or align left dataframe [col1, col2] and right dataframe [col1, col2], the result dataframe will # have columns [col1, col2, col1_a12b, col2_de3b], where col1_a12b, col2_de3b are just alias of # col1 and col2 in right dataframe. + coalesce_config = JoinKeyCoalesceConfig.NONE + if coalesce_config == JoinKeyCoalesceConfig.NONE: + continue + + coalesce_col_type = None + origin_left_col_type = ( + left.snowflake_quoted_identifier_to_snowpark_pandas_type[ + origin_left_col + ] + ) + origin_right_col_type = ( + right.snowflake_quoted_identifier_to_snowpark_pandas_type[ + origin_right_col + ] + ) + + # Coalescing is only required for 'outer' or 'asof' joins or align. + # For 'inner' and 'left' join we use left join keys and for 'right' join we + # use right join keys. + # For 'left' and 'coalesce' align we use left join keys. + if how in ("asof", "outer"): + # Generate an expression equivalent of + # "COALESCE('left_col', 'right_col') as 'left_col'" + coalesce_column_identifier = ( + result_ordered_frame.generate_snowflake_quoted_identifiers( + pandas_labels=[ + extract_pandas_label_from_snowflake_quoted_identifier( + left_col + ) + ], + )[0] + ) + coalesce_column_identifiers.append(coalesce_column_identifier) + coalesce_column_values.append(coalesce(left_col, right_col)) + if origin_left_col_type == origin_right_col_type: + coalesce_col_type = origin_left_col_type + elif how == "right": + # No coalescing required for 'right' join. Simply use right join key + # as output column. + coalesce_column_identifier = right_col + coalesce_col_type = origin_right_col_type + elif how in ("inner", "left", "coalesce"): + # No coalescing required for 'left' or 'inner' join and for 'left' or + # 'coalesce' align. Simply use left join key as output column. coalesce_column_identifier = left_col coalesce_col_type = origin_left_col_type else: - # Coalescing is only required for 'outer' or 'asof' joins or align. - # For 'inner' and 'left' join we use left join keys and for 'right' join we - # use right join keys. - # For 'left' and 'coalesce' align we use left join keys. - if how in ("asof", "outer"): - # Generate an expression equivalent of - # "COALESCE('left_col', 'right_col') as 'left_col'" - coalesce_column_identifier = ( - result_ordered_frame.generate_snowflake_quoted_identifiers( - pandas_labels=[ - extract_pandas_label_from_snowflake_quoted_identifier( - left_col - ) - ], - )[0] - ) - coalesce_column_identifiers.append(coalesce_column_identifier) - coalesce_column_values.append(coalesce(left_col, right_col)) - if origin_left_col_type == origin_right_col_type: - coalesce_col_type = origin_left_col_type - elif how == "right": - # No coalescing required for 'right' join. Simply use right join key - # as output column. - coalesce_column_identifier = right_col - coalesce_col_type = origin_right_col_type - elif how in ("inner", "left", "coalesce"): - # No coalescing required for 'left' or 'inner' join and for 'left' or - # 'coalesce' align. Simply use left join key as output column. - coalesce_column_identifier = left_col - coalesce_col_type = origin_left_col_type - else: - raise AssertionError(f"Unsupported join/align type {how}") + raise AssertionError(f"Unsupported join/align type {how}") - if coalesce_config == JoinKeyCoalesceConfig.RIGHT: - # swap left_col and right_col - left_col, right_col = right_col, left_col + if coalesce_config == JoinKeyCoalesceConfig.RIGHT: + # swap left_col and right_col + left_col, right_col = right_col, left_col # To provide same behavior as native pandas, remove duplicate join column. if right_col in data_column_snowflake_quoted_identifiers: @@ -1207,15 +1213,7 @@ def align( # NULL NULL 2 NULL 4 e 2 coalesce_key_config = None inherit_join_index = InheritJoinIndex.FROM_LEFT - # When it is `outer` align, we need to coalesce the align columns. However, if the - # ordering columns of aligned result is the same as the left frame, that means the - # join columns of left and right matches, then there is no need to coalesce the join - # keys, simply inherent from left gives the correct result. - # Retaining the original columns also helps avoid unnecessary join in later steps. - # if ( - # how == "outer" - # and aligned_ordered_frame.ordering_columns != left.ordering_columns - # ): + # When it is `outer` align, we need to coalesce the align columns. if how == "outer": coalesce_key_config = [JoinKeyCoalesceConfig.LEFT] * len(left_on) inherit_join_index = InheritJoinIndex.FROM_BOTH From ee57e4c8e080ee799f831ef87b0cd7e0b04e17fa Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Thu, 29 Aug 2024 10:43:41 -0700 Subject: [PATCH 12/15] fix error --- .../modin/plugin/_internal/join_utils.py | 94 ++++++++++--------- tests/integ/modin/binary/test_binary_op.py | 11 ++- 2 files changed, 58 insertions(+), 47 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index 4ba6bdd7653..457bd388f2b 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -337,10 +337,26 @@ def _create_internal_frame_with_join_or_align_result( for origin_left_col, origin_right_col, coalesce_config in zip( left_on, right_on, key_coalesce_config ): + if coalesce_config == JoinKeyCoalesceConfig.NONE: + continue + + coalesce_col_type = None + origin_left_col_type = ( + left.snowflake_quoted_identifier_to_snowpark_pandas_type[ + origin_left_col + ] + ) + origin_right_col_type = ( + right.snowflake_quoted_identifier_to_snowpark_pandas_type[ + origin_right_col + ] + ) + left_col = result_helper.map_left_quoted_identifiers([origin_left_col])[0] right_col = result_helper.map_right_quoted_identifiers([origin_right_col])[ 0 ] + if no_join_applied and origin_left_col == origin_right_col: # if no join is applied, that means the result dataframe, left dataframe and right dataframe # shares the same base dataframe. If the original left column and original right column are the @@ -353,54 +369,42 @@ def _create_internal_frame_with_join_or_align_result( # or align left dataframe [col1, col2] and right dataframe [col1, col2], the result dataframe will # have columns [col1, col2, col1_a12b, col2_de3b], where col1_a12b, col2_de3b are just alias of # col1 and col2 in right dataframe. - coalesce_config = JoinKeyCoalesceConfig.NONE - if coalesce_config == JoinKeyCoalesceConfig.NONE: - continue - - coalesce_col_type = None - origin_left_col_type = ( - left.snowflake_quoted_identifier_to_snowpark_pandas_type[ - origin_left_col - ] - ) - origin_right_col_type = ( - right.snowflake_quoted_identifier_to_snowpark_pandas_type[ - origin_right_col - ] - ) - - # Coalescing is only required for 'outer' or 'asof' joins or align. - # For 'inner' and 'left' join we use left join keys and for 'right' join we - # use right join keys. - # For 'left' and 'coalesce' align we use left join keys. - if how in ("asof", "outer"): - # Generate an expression equivalent of - # "COALESCE('left_col', 'right_col') as 'left_col'" - coalesce_column_identifier = ( - result_ordered_frame.generate_snowflake_quoted_identifiers( - pandas_labels=[ - extract_pandas_label_from_snowflake_quoted_identifier( - left_col - ) - ], - )[0] - ) - coalesce_column_identifiers.append(coalesce_column_identifier) - coalesce_column_values.append(coalesce(left_col, right_col)) - if origin_left_col_type == origin_right_col_type: - coalesce_col_type = origin_left_col_type - elif how == "right": - # No coalescing required for 'right' join. Simply use right join key - # as output column. - coalesce_column_identifier = right_col - coalesce_col_type = origin_right_col_type - elif how in ("inner", "left", "coalesce"): - # No coalescing required for 'left' or 'inner' join and for 'left' or - # 'coalesce' align. Simply use left join key as output column. + coalesce_config = JoinKeyCoalesceConfig.LEFT coalesce_column_identifier = left_col coalesce_col_type = origin_left_col_type else: - raise AssertionError(f"Unsupported join/align type {how}") + # Coalescing is only required for 'outer' or 'asof' joins or align. + # For 'inner' and 'left' join we use left join keys and for 'right' join we + # use right join keys. + # For 'left' and 'coalesce' align we use left join keys. + if how in ("asof", "outer"): + # Generate an expression equivalent of + # "COALESCE('left_col', 'right_col') as 'left_col'" + coalesce_column_identifier = ( + result_ordered_frame.generate_snowflake_quoted_identifiers( + pandas_labels=[ + extract_pandas_label_from_snowflake_quoted_identifier( + left_col + ) + ], + )[0] + ) + coalesce_column_identifiers.append(coalesce_column_identifier) + coalesce_column_values.append(coalesce(left_col, right_col)) + if origin_left_col_type == origin_right_col_type: + coalesce_col_type = origin_left_col_type + elif how == "right": + # No coalescing required for 'right' join. Simply use right join key + # as output column. + coalesce_column_identifier = right_col + coalesce_col_type = origin_right_col_type + elif how in ("inner", "left", "coalesce"): + # No coalescing required for 'left' or 'inner' join and for 'left' or + # 'coalesce' align. Simply use left join key as output column. + coalesce_column_identifier = left_col + coalesce_col_type = origin_left_col_type + else: + raise AssertionError(f"Unsupported join/align type {how}") if coalesce_config == JoinKeyCoalesceConfig.RIGHT: # swap left_col and right_col diff --git a/tests/integ/modin/binary/test_binary_op.py b/tests/integ/modin/binary/test_binary_op.py index 25e6b84844c..cd036bcb045 100644 --- a/tests/integ/modin/binary/test_binary_op.py +++ b/tests/integ/modin/binary/test_binary_op.py @@ -2588,17 +2588,24 @@ def test_df_sub_series(): ) -@sql_count_checker(query_count=1, join_count=0) -def test_binary_op_series_from_same_df(): +@sql_count_checker(query_count=2, join_count=0) +def test_binary_op_multi_series_from_same_df(): native_df = native_pd.DataFrame( { "A": [1, 2, 3], "B": [2, 3, 4], "C": [4, 5, 6], + "D": [2, 2, 3], }, index=["a", "b", "c"], ) snow_df = pd.DataFrame(native_df) + # ensure performing more than one binary operation for series coming from same + # dataframe does not produce any join. eval_snowpark_pandas_result( snow_df, native_df, lambda df: df["A"] + df["B"] + df["C"] ) + # perform binary operations in different orders + eval_snowpark_pandas_result( + snow_df, native_df, lambda df: (df["A"] + df["B"]) + (df["C"] + df["D"]) + ) From a999afefc366e3b5dc02837c301266b7c4749278 Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Thu, 29 Aug 2024 11:32:39 -0700 Subject: [PATCH 13/15] fix error --- tests/integ/modin/test_concat.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/integ/modin/test_concat.py b/tests/integ/modin/test_concat.py index cb5d224d186..c1366c22506 100644 --- a/tests/integ/modin/test_concat.py +++ b/tests/integ/modin/test_concat.py @@ -1071,6 +1071,15 @@ def test_concat_series_from_same_df(join): select_data = [f'{i} as "{i}"' for i in range(num_cols)] query = f"select {', '.join(select_data)}" + # concat today uses join_on_index to concat all series, we use + # read_snowflake here so that the default index is created and + # managed by snowpark pandas, which is the same as row position + # column. This creates a valid optimization scenario for join, where + # join performed on the same row_position column doesn't require + # actual join. + # This can not be done with pd.DataFrame constructor because the index + # and row position column is controlled by client side, which are + # different columns. df = pd.read_snowflake(query) series = [df[col] for col in df.columns] From 9b8d72cd7471b9abc991f4eb97abdb3acee30012 Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Thu, 29 Aug 2024 19:15:47 -0700 Subject: [PATCH 14/15] fix error --- tests/unit/modin/test_join_utils.py | 78 +++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 tests/unit/modin/test_join_utils.py diff --git a/tests/unit/modin/test_join_utils.py b/tests/unit/modin/test_join_utils.py new file mode 100644 index 00000000000..a34f15d665e --- /dev/null +++ b/tests/unit/modin/test_join_utils.py @@ -0,0 +1,78 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# + +import pytest +from unittest import mock + +from collections.abc import Hashable + +import snowflake.snowpark.modin.plugin # noqa: F401 + +from snowflake.snowpark.modin.plugin.compiler.snowflake_query_compiler import InternalFrame +from snowflake.snowpark.modin.plugin._internal.frame import OrderedDataFrame, OrderingColumn +from snowflake.snowpark.modin.plugin._internal.join_utils import _create_internal_frame_with_join_or_align_result, JoinKeyCoalesceConfig, InheritJoinIndex + + +def mock_internal_frame( + data_column_pandas_labels: list[Hashable], + data_column_pandas_index_names: list[Hashable], + data_column_snowflake_quoted_identifiers: list[str], + index_column_pandas_labels: list[Hashable], + index_column_snowflake_quoted_identifiers: list[str], +) -> InternalFrame: + ordered_dataframe = mock.create_autospec(OrderedDataFrame) + ordered_dataframe.projected_column_snowflake_quoted_identifiers = data_column_snowflake_quoted_identifiers + index_column_snowflake_quoted_identifiers + ordered_dataframe.ordering_columns = [OrderingColumn(col) for col in ordered_dataframe.projected_column_snowflake_quoted_identifiers] + internal_frame = InternalFrame.create( + ordered_dataframe=ordered_dataframe, + data_column_pandas_labels=data_column_pandas_labels, + data_column_pandas_index_names=data_column_pandas_index_names, + data_column_snowflake_quoted_identifiers=data_column_snowflake_quoted_identifiers, + index_column_pandas_labels=index_column_pandas_labels, + index_column_snowflake_quoted_identifiers=index_column_snowflake_quoted_identifiers, + data_column_types=[None] * len(data_column_pandas_labels), + index_column_types=[None] * len(index_column_pandas_labels), + ) + + return internal_frame + + +def test_create_internal_frame_with_result_using_invalid_methods( +): + left_frame = mock_internal_frame( + data_column_pandas_labels=['a1', 'b1'], + data_column_pandas_index_names=[None], + data_column_snowflake_quoted_identifiers=['"A1"', '"B1"'], + index_column_pandas_labels=['i1'], + index_column_snowflake_quoted_identifiers=['"I1"'], + ) + + right_frame = mock_internal_frame( + data_column_pandas_labels=['a2', 'b2'], + data_column_pandas_index_names=[None], + data_column_snowflake_quoted_identifiers=['"A2"', '"B2"'], + index_column_pandas_labels=['i2'], + index_column_snowflake_quoted_identifiers=['"I2"'], + ) + + result_ordered_frame = mock.create_autospec(OrderedDataFrame) + result_ordered_frame.projected_column_snowflake_quoted_identifiers = ['"I1"', '"A1"', '"B1"', '"I2"', '"A2"', '"B2"'] + result_ordered_frame._ordering_columns_tuple = [ + OrderingColumn('"I1"'), + OrderingColumn('"I2"'), + ] + + with pytest.raises(AssertionError, match="Unsupported join/align type invalid"): + _create_internal_frame_with_join_or_align_result( + result_ordered_frame=result_ordered_frame, + left=left_frame, + right=right_frame, + how="invalid", + left_on=['"I1"'], + right_on=['"I2"'], + sort=False, + key_coalesce_config=[JoinKeyCoalesceConfig.LEFT], + inherit_index=InheritJoinIndex.FROM_LEFT, + ) + From 2a6e3a49938ee19494ca451b8ada69d67c7886a9 Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Fri, 30 Aug 2024 08:51:14 -0700 Subject: [PATCH 15/15] fix lint problem --- tests/unit/modin/test_join_utils.py | 51 ++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/tests/unit/modin/test_join_utils.py b/tests/unit/modin/test_join_utils.py index a34f15d665e..031ab13bef9 100644 --- a/tests/unit/modin/test_join_utils.py +++ b/tests/unit/modin/test_join_utils.py @@ -2,16 +2,24 @@ # Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. # -import pytest +from collections.abc import Hashable from unittest import mock -from collections.abc import Hashable +import pytest import snowflake.snowpark.modin.plugin # noqa: F401 - -from snowflake.snowpark.modin.plugin.compiler.snowflake_query_compiler import InternalFrame -from snowflake.snowpark.modin.plugin._internal.frame import OrderedDataFrame, OrderingColumn -from snowflake.snowpark.modin.plugin._internal.join_utils import _create_internal_frame_with_join_or_align_result, JoinKeyCoalesceConfig, InheritJoinIndex +from snowflake.snowpark.modin.plugin._internal.frame import ( + OrderedDataFrame, + OrderingColumn, +) +from snowflake.snowpark.modin.plugin._internal.join_utils import ( + InheritJoinIndex, + JoinKeyCoalesceConfig, + _create_internal_frame_with_join_or_align_result, +) +from snowflake.snowpark.modin.plugin.compiler.snowflake_query_compiler import ( + InternalFrame, +) def mock_internal_frame( @@ -22,8 +30,14 @@ def mock_internal_frame( index_column_snowflake_quoted_identifiers: list[str], ) -> InternalFrame: ordered_dataframe = mock.create_autospec(OrderedDataFrame) - ordered_dataframe.projected_column_snowflake_quoted_identifiers = data_column_snowflake_quoted_identifiers + index_column_snowflake_quoted_identifiers - ordered_dataframe.ordering_columns = [OrderingColumn(col) for col in ordered_dataframe.projected_column_snowflake_quoted_identifiers] + ordered_dataframe.projected_column_snowflake_quoted_identifiers = ( + data_column_snowflake_quoted_identifiers + + index_column_snowflake_quoted_identifiers + ) + ordered_dataframe.ordering_columns = [ + OrderingColumn(col) + for col in ordered_dataframe.projected_column_snowflake_quoted_identifiers + ] internal_frame = InternalFrame.create( ordered_dataframe=ordered_dataframe, data_column_pandas_labels=data_column_pandas_labels, @@ -38,26 +52,32 @@ def mock_internal_frame( return internal_frame -def test_create_internal_frame_with_result_using_invalid_methods( -): +def test_create_internal_frame_with_result_using_invalid_methods(): left_frame = mock_internal_frame( - data_column_pandas_labels=['a1', 'b1'], + data_column_pandas_labels=["a1", "b1"], data_column_pandas_index_names=[None], data_column_snowflake_quoted_identifiers=['"A1"', '"B1"'], - index_column_pandas_labels=['i1'], + index_column_pandas_labels=["i1"], index_column_snowflake_quoted_identifiers=['"I1"'], ) right_frame = mock_internal_frame( - data_column_pandas_labels=['a2', 'b2'], + data_column_pandas_labels=["a2", "b2"], data_column_pandas_index_names=[None], data_column_snowflake_quoted_identifiers=['"A2"', '"B2"'], - index_column_pandas_labels=['i2'], + index_column_pandas_labels=["i2"], index_column_snowflake_quoted_identifiers=['"I2"'], ) result_ordered_frame = mock.create_autospec(OrderedDataFrame) - result_ordered_frame.projected_column_snowflake_quoted_identifiers = ['"I1"', '"A1"', '"B1"', '"I2"', '"A2"', '"B2"'] + result_ordered_frame.projected_column_snowflake_quoted_identifiers = [ + '"I1"', + '"A1"', + '"B1"', + '"I2"', + '"A2"', + '"B2"', + ] result_ordered_frame._ordering_columns_tuple = [ OrderingColumn('"I1"'), OrderingColumn('"I2"'), @@ -75,4 +95,3 @@ def test_create_internal_frame_with_result_using_invalid_methods( key_coalesce_config=[JoinKeyCoalesceConfig.LEFT], inherit_index=InheritJoinIndex.FROM_LEFT, ) -