Skip to content

Commit

Permalink
SNOW-1321662: Update merge behavior to pandas 2.x (#1577)
Browse files Browse the repository at this point in the history
<!---
Please answer these questions before creating your pull request. Thanks!
--->

1. Which Jira issue is this PR addressing? Make sure that there is an
accompanying issue to your PR.

   <!---
   In this section, please add a Snowflake Jira issue number.
   
Note that if a corresponding GitHub issue exists, you should still
include
   the Snowflake Jira issue number. For example, for GitHub issue
#1400, you should
   add "SNOW-1335071" here.
    --->

   Fixes SNOW-1321662

2. Fill out the following pre-review checklist:

- [ ] I am adding a new automated test(s) to verify correctness of my
new code
   - [ ] I am adding new logging messages
   - [ ] I am adding a new telemetry message
   - [ ] I am adding new credentials
   - [ ] I am adding a new dependency

3. Please describe how your code solves the related issue.

 Update merge behavior to match with pandas 2.x
 change 1: merge/join with how='outer' always performs sorting. 
change 2: for merge/join between frames with multi index, order of index
columns is now:
   right index columns + remaining left index columns (if how != right)
   left index columns + remaining right index columns otherwise
   
Native pandas has fixed lot of ordering related bugs in 2.x
https://github.com/pandas-dev/pandas/pull/54611/files
So also removed custom ordering logic from tests which is no longer
required.

Also updated tests and removed xfails.
  • Loading branch information
sfc-gh-nkumar authored May 15, 2024
1 parent fe8ba22 commit e7dffea
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 120 deletions.
1 change: 1 addition & 0 deletions src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Fixed incorrect return type in `qcut` when given `Series` input and improved error checking logic.
- Fixed bug when performing multiple DataFrameGroupBy apply/transform operations on the same DataFrame.
- Fixed type hints for property methods, e.g. Series.empty.
- Fixed `pd.merge` and `Dataframe.merge` outer join behavior according to pandas 2.x.

### Behavior Changes
- Given an input of type `Series`, `pd.qcut` always returns a `Series`.
Expand Down
22 changes: 14 additions & 8 deletions src/snowflake/snowpark/modin/plugin/_internal/join_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ class IndexJoinInfo(NamedTuple):
def _get_index_columns_to_join(
left: InternalFrame,
right: InternalFrame,
how: Union[JoinTypeLit, AlignTypeLit],
) -> IndexJoinInfo:
"""
Decide the index columns that need to participate in join. Depends on single or multiindex situation
Expand Down Expand Up @@ -744,6 +745,7 @@ def _get_index_columns_to_join(
Args:
left: Dataframe on left side of join.
right: Dataframe on right side of join.
how: Join or align type.
Returns:
Tuple contains:
Expand Down Expand Up @@ -805,9 +807,8 @@ def _get_index_columns_to_join(
if is_left_multiindex and is_right_multiindex:
# Case 3
# Order of index columns in output frame =
# common index columns (value are coalesced)
# + remaining left index columns
# + remaining right index columns
# right index columns + remaining left index columns (if how != right)
# left index columns + remaining right index columns otherwise (otherwise)
left_remaining_labels = [
label
for label in left.index_column_pandas_labels
Expand All @@ -818,9 +819,14 @@ def _get_index_columns_to_join(
for label in right.index_column_pandas_labels
if label not in common_labels
]
expected_index_labels = (
common_labels + left_remaining_labels + right_remaining_labels
)
if how == "right":
expected_index_labels = (
right.index_column_pandas_labels + left_remaining_labels
)
else:
expected_index_labels = (
left.index_column_pandas_labels + right_remaining_labels
)
else:
# Case 4
expected_index_labels = (
Expand Down Expand Up @@ -908,7 +914,7 @@ def join_on_index_columns(
include mapping for index + data columns, ordering columns and row position column
if exists.
"""
index_join_info = _get_index_columns_to_join(left, right)
index_join_info = _get_index_columns_to_join(left, right, how)

joined_frame, result_column_mapper = join(
left,
Expand Down Expand Up @@ -1102,7 +1108,7 @@ def align_on_index(
if exists.
"""

index_join_info = _get_index_columns_to_join(left, right)
index_join_info = _get_index_columns_to_join(left, right, how)
# Re-project the active columns to make sure all active columns of the internal frame participate in
# the align operation, and unnecessary columns are dropped from the projection.
left = left.select_active_columns()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5365,6 +5365,10 @@ def merge(

left = self
join_index_on_index = left_index and right_index
# As per this bug fix in pandas 2.2.x outer join always produce sorted results.
# https://github.com/pandas-dev/pandas/pull/54611/files
if how == "outer":
sort = True

# Labels of indicator columns in input frames. We use these columns to generate
# final indicator column in merged frame.
Expand Down
135 changes: 37 additions & 98 deletions tests/integ/modin/frame/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,8 @@ def _merge_native_pandas_frames_on_index_on_both_sides(
is_left_mi = left.index.nlevels > 1
is_right_mi = right.index.nlevels > 1
if sort:
# When joining single index frame with multi-index or multi-index frame with
# single index, native pandas doesn't respect 'sort' argument. It always
# behaves as sort=False.
# When multi-index is involved on either (or both) side of frames, native pandas
# doesn't respect 'sort' argument. It always behaves as sort=False.
# We need to sort explicitly to compare results with Snowpark pandas result.
if (is_left_mi and not is_right_mi) or (not is_left_mi and is_right_mi):
join_key = left.index.name if is_right_mi else right.index.name
Expand All @@ -103,17 +102,16 @@ def _merge_native_pandas_frames_on_index_on_both_sides(
.sort_values(join_key, kind="stable")
.set_index([join_key], append=True)
)
# When joining multi-index frame with another multi-index frame where index
# labels are same only order is different, native pandas doesn't respect 'sort'
# argument. It always behaves as sort=False.
# We need to sort explicitly to compare results with Snowpark pandas result.
if (
is_left_mi
and is_right_mi
and left.index.nlevels == right.index.nlevels
and not set(left.index.names).difference(set(right.index.names))
and set(left.index.names).intersection(set(right.index.names))
):
native_res = native_res.sort_index()
# sort on common index columns.
levels = [name for name in left.index.names if name in right.index.names]
native_res = native_res.sort_index(
level=levels, sort_remaining=False, kind="stable"
)

# Index column name in merged frame is pretty inconsistent in native pandas.
# In some cases it is inherited from left frame and in some cases its set to None.
Expand Down Expand Up @@ -185,32 +183,6 @@ def _merge_native_pandas_frames_on_index_on_one_side(
return native_res.set_index(left_index_names)


def _add_row_position_columns(
left: native_pd.DataFrame, right: Union[native_pd.DataFrame, native_pd.Series]
) -> tuple[native_pd.DataFrame, native_pd.DataFrame]:
if isinstance(right, native_pd.Series):
right = right.to_frame()
# Add row position columns to both frames.
left = left.assign(left_pos=range(len(left)))
right = right.assign(right_pos=range(len(right)))
return left, right


def _sort_and_remove_row_position_columns(
df: native_pd.DataFrame, how: str, ignore_index: bool
) -> native_pd.DataFrame:
# Sort by row position columns
# To match native pandas behavior, reset index if left_index and right_index
# both are false.
row_position_columns = ["left_pos", "right_pos"]
if how == "right":
row_position_columns.reverse()
df = df.sort_values(row_position_columns, ignore_index=ignore_index)

# Drop row position columns
return df.drop(row_position_columns, axis=1)


def _verify_merge(
left: pd.DataFrame,
right: Union[pd.DataFrame, pd.Series],
Expand All @@ -226,27 +198,13 @@ def _verify_merge(
indicator: Optional[Union[bool, str]] = False,
) -> None:
"""
For inner and outer join order of joined frame in Snowpark pandas is different from
native pandas. In Snowpark pandas we order by [left.row_position, right.row_position]
while in Native pandas output of joined frame is: first rows are grouped by the
keys, and order among keys is inherited from the left dataframe, and then the right
data frame.
In addition to above there are bugs in Native pandas where even left and right join
do not preserve order from left frame or right frame respectively.
https://github.com/pandas-dev/pandas/issues/40608
join/merge of DataFrame does not keep order of index
To avoid comparison failure due to bugs in Native pandas we perform some custom
operation after merge.
Some example bugs:
https://github.com/pandas-dev/pandas/issues/46225
outer join out of order when joining multiple DataFrames
To compare join/merge results we perform following additional operations on
native dataframes to simulate ordering behavior of Snowpark pandas.
1. Add an extra column with row position to left and right dataframes.
2. Join both frames.
3. Sort joined frame on row position columns added in step 1.
4. Drop row position columns and compare.
Args:
left: Left DataFrame to join
right: Right DataFrame or Series to join
Expand All @@ -264,11 +222,7 @@ def _verify_merge(
"""
left_native = left.to_pandas()
right_native = right.to_pandas()
# Step 1: Add row position columns
if not sort:
left_native, right_native = _add_row_position_columns(left_native, right_native)

# Step 2: Join frames.
if left_index and right_index:
native_res = _merge_native_pandas_frames_on_index_on_both_sides(
left_native, right_native, how, sort=sort
Expand Down Expand Up @@ -297,13 +251,6 @@ def _verify_merge(
indicator=indicator,
)

if not sort:
# Step 3 & 4
ignore_index = not (left_index or right_index)
native_res = _sort_and_remove_row_position_columns(
native_res, how, ignore_index
)

if force_output_column_order:
native_res = native_res.reindex(columns=force_output_column_order)

Expand Down Expand Up @@ -339,10 +286,6 @@ def test_merge_on(left_df, right_df, on, how, sort):
@pytest.mark.parametrize("on", ["left_i", "right_i"])
@sql_count_checker(query_count=3, join_count=1)
def test_merge_on_index_columns(left_df, right_df, how, on, sort):
if how == "outer" and sort is False:
pytest.xfail(
"SNOW-1321662 - pandas 2.2.1 update fails when merge is outer and sort is False"
)
# Change left_df to: columns=["right_i", "B", "left_c", "left_d"] index=["left_i"]
left_df = left_df.rename(columns={"A": "right_i"})
# Change right_df to: columns=["left_i", "B", "right_c", "right_d"] index=["right_i"]
Expand Down Expand Up @@ -394,8 +337,8 @@ def test_join_type_mismatch_negative(index1, index2):
[3, 4],
[True, False],
native_pd.DataFrame(
{"A": [1.0, 2.0, np.NaN], "B": [3, 3, 4]},
index=native_pd.Index([True, True, False]),
{"A": [np.NaN, 1.0, 2.0], "B": [4, 3, 3]},
index=native_pd.Index([False, True, True]),
),
),
# string and bool, Snowflake converts bool to string, and then performs the join. However, native pandas
Expand All @@ -404,8 +347,8 @@ def test_join_type_mismatch_negative(index1, index2):
["a", "b"],
[True, False],
native_pd.DataFrame(
{"A": [1.0, 2.0, np.NaN, np.NaN], "B": [np.NaN, np.NaN, 3.0, 4.0]},
index=native_pd.Index(["a", "b", "true", "false"]),
{"A": [1.0, 2.0, np.NaN, np.NaN], "B": [np.NaN, np.NaN, 4.0, 3.0]},
index=native_pd.Index(["a", "b", "false", "true"]),
),
),
],
Expand Down Expand Up @@ -473,7 +416,6 @@ def test_merge_on_index_single_index(left_df, right_df, how, sort):
_verify_merge(left_df, right_df, how, left_index=True, right_index=True, sort=sort)


@pytest.mark.xfail(reason="SNOW-1321662 - pandas 2.2.1 update failure", strict=True)
@sql_count_checker(query_count=3, join_count=1)
def test_merge_on_index_multiindex_common_labels(left_df, right_df, how, sort):
left_df = left_df.set_index("A", append=True) # index columns ['left_i', 'A']
Expand All @@ -483,7 +425,10 @@ def test_merge_on_index_multiindex_common_labels(left_df, right_df, how, sort):
)


@pytest.mark.xfail(reason="SNOW-1321662 - pandas 2.2.1 update failure", strict=True)
@pytest.mark.xfail(
reason="pandas bug: https://github.com/pandas-dev/pandas/issues/58721",
strict=True,
)
def test_merge_on_index_multiindex_common_labels_with_none(
left_df, right_df, how, sort
):
Expand Down Expand Up @@ -513,18 +458,22 @@ def test_merge_on_index_multiindex_equal_labels(left_df, right_df, how, sort):


def test_merge_left_index_right_index_single_to_multi(left_df, right_df, how, sort):
if how == "outer" and sort is False:
pytest.xfail(
"SNOW-1321662 - pandas 2.2.1 update fails when merge is outer and sort is False"
)
right_df = right_df.rename(columns={"A": "left_i"}).set_index(
"left_i", append=True
) # index columns ['right_i', 'left_i']
if how in ("inner", "right"):
with SqlCounter(query_count=3, join_count=1):
_verify_merge(
left_df, right_df, how=how, left_index=True, right_index=True, sort=sort
)
if how == "inner" and sort is False:
pytest.skip("pandas bug: https://github.com/pandas-dev/pandas/issues/55774")
else:
with SqlCounter(query_count=3, join_count=1):
_verify_merge(
left_df,
right_df,
how=how,
left_index=True,
right_index=True,
sort=sort,
)
else: # left and outer join
# When joining single index with multi index, in native pandas 'left' join
# behaves as 'inner' join and 'outer' join behaves as 'right' join.
Expand All @@ -549,10 +498,6 @@ def test_merge_left_index_right_index_single_to_multi(left_df, right_df, how, so


def test_merge_left_index_right_index_multi_to_single(left_df, right_df, how, sort):
if how == "outer" and sort is False:
pytest.xfail(
"SNOW-1321662 - pandas 2.2.1 update fails when merge is outer and sort is False"
)
left_df = left_df.rename(columns={"A": "right_i"}).set_index(
"right_i", append=True
) # index columns ['left_i', 'right_i']
Expand Down Expand Up @@ -897,19 +842,13 @@ def test_merge_duplicate_join_keys_negative(left_df, right_df):
)


@sql_count_checker(query_count=2)
@sql_count_checker(query_count=0)
def test_merge_invalid_how_negative(left_df, right_df):
pytest.xfail("SNOW-1321662 - pandas 2.2.1 update error message different in pandas")
eval_snowpark_pandas_result(
left_df,
left_df.to_pandas(),
lambda df: df.merge(
right_df if isinstance(df, pd.DataFrame) else right_df.to_pandas(),
on="A",
how="full_outer_join",
),
expect_exception=True,
)
# native pandas raises UnboundLocalError: local variable 'lidx' referenced before assignment
# In snowpark pandas we raise more meaningful error
msg = "do not recognize join method full_outer_join"
with pytest.raises(ValueError, match=msg):
left_df.merge(right_df, on="A", how="full_outer_join")


@sql_count_checker(query_count=2, join_count=1)
Expand Down
28 changes: 14 additions & 14 deletions tests/integ/modin/series/test_bitwise_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,14 @@ def check_op(native_lhs, native_rhs, snow_lhs, snow_rhs):
[True, True, True, True, None, None],
index=native_pd.MultiIndex.from_tuples(
[
(0, 2.0, 0),
(0, 0.0, 0),
(0, 1.0, 0),
(1, 2.0, 0),
(2, np.nan, 0),
(2, np.nan, 1),
(2.0, 0, 0),
(0.0, 0, 0),
(1.0, 0, 0),
(2.0, 1, 0),
(np.nan, 2, 0),
(np.nan, 2, 1),
],
names=["x", "y", "z"],
names=["y", "x", "z"],
),
),
),
Expand Down Expand Up @@ -302,14 +302,14 @@ def test_bitwise_binary_between_series_with_deviating_behavior_or(
[True, False, False, True, False, False],
index=native_pd.MultiIndex.from_tuples(
[
(0, 2.0, 0),
(0, 0.0, 0),
(0, 1.0, 0),
(1, 2.0, 0),
(2, np.nan, 0),
(2, np.nan, 1),
(2.0, 0, 0),
(0.0, 0, 0),
(1.0, 0, 0),
(2.0, 1, 0),
(np.nan, 2, 0),
(np.nan, 2, 1),
],
names=["x", "y", "z"],
names=["y", "x", "z"],
),
),
),
Expand Down

0 comments on commit e7dffea

Please sign in to comment.