Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1662105, SNOW-1662657: Support by, left_by, right_by for pd.merge_asof #2284

Merged
merged 7 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/snowflake/snowpark/modin/plugin/_internal/cut_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ def compute_bin_indices(
values_frame,
cuts_frame,
how="asof",
left_on=[],
right_on=[],
left_match_col=values_frame.data_column_snowflake_quoted_identifiers[0],
right_match_col=cuts_frame.data_column_snowflake_quoted_identifiers[0],
match_comparator=MatchComparator.LESS_THAN_OR_EQUAL_TO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,6 @@ def _get_adjusted_key_frame_by_row_pos_int_frame(
key,
count_frame,
"cross",
left_on=[],
right_on=[],
inherit_join_index=InheritJoinIndex.FROM_LEFT,
)

Expand Down
132 changes: 86 additions & 46 deletions src/snowflake/snowpark/modin/plugin/_internal/join_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,55 @@ class JoinOrAlignInternalFrameResult(NamedTuple):
result_column_mapper: JoinOrAlignResultColumnMapper


def assert_snowpark_pandas_types_match(
left: InternalFrame,
right: InternalFrame,
left_identifiers: list[str],
right_identifiers: list[str],
) -> None:
"""
sfc-gh-nkrishna marked this conversation as resolved.
Show resolved Hide resolved
If Snowpark pandas types do not match for the given identifiers, then a ValueError will be raised.

Args:
left: An internal frame to use on left side of join.
right: An internal frame to use on right side of join.
left_identifiers: List of snowflake identifiers to check types from 'left' frame.
right_identifiers: List of snowflake identifiers to check types from 'right' frame.
left_identifiers and right_identifiers must be lists of equal length.

Returns: None

Raises: ValueError
"""
left_types = [
left.snowflake_quoted_identifier_to_snowpark_pandas_type.get(id, None)
for id in left_identifiers
]
right_types = [
right.snowflake_quoted_identifier_to_snowpark_pandas_type.get(id, None)
for id in right_identifiers
]
for i, (lt, rt) in enumerate(zip(left_types, right_types)):
if lt != rt:
left_on_id = left_identifiers[i]
idx = left.data_column_snowflake_quoted_identifiers.index(left_on_id)
key = left.data_column_pandas_labels[idx]
lt = lt if lt is not None else left.get_snowflake_type(left_on_id)
rt = (
rt if rt is not None else right.get_snowflake_type(right_identifiers[i])
)
raise ValueError(
f"You are trying to merge on {type(lt).__name__} and {type(rt).__name__} columns for key '{key}'. "
f"If you wish to proceed you should use pd.concat"
)


def join(
left: InternalFrame,
right: InternalFrame,
how: JoinTypeLit,
left_on: list[str],
right_on: list[str],
left_on: Optional[list[str]] = None,
right_on: Optional[list[str]] = None,
left_match_col: Optional[str] = None,
right_match_col: Optional[str] = None,
match_comparator: Optional[MatchComparator] = None,
Expand Down Expand Up @@ -161,40 +204,31 @@ def join(
include mapping for index + data columns, ordering columns and row position column
if exists.
"""
assert len(left_on) == len(
right_on
), "left_on and right_on must be of same length or both be None"
if join_key_coalesce_config is not None:
assert len(join_key_coalesce_config) == len(
left_on
), "join_key_coalesce_config must be of same length as left_on and right_on"
assert how in get_args(
JoinTypeLit
), f"Invalid join type: {how}. Allowed values are {get_args(JoinTypeLit)}"

def assert_snowpark_pandas_types_match() -> None:
"""If Snowpark pandas types do not match, then a ValueError will be raised."""
left_types = [
left.snowflake_quoted_identifier_to_snowpark_pandas_type.get(id, None)
for id in left_on
]
right_types = [
right.snowflake_quoted_identifier_to_snowpark_pandas_type.get(id, None)
for id in right_on
]
for i, (lt, rt) in enumerate(zip(left_types, right_types)):
if lt != rt:
left_on_id = left_on[i]
idx = left.data_column_snowflake_quoted_identifiers.index(left_on_id)
key = left.data_column_pandas_labels[idx]
lt = lt if lt is not None else left.get_snowflake_type(left_on_id)
rt = rt if rt is not None else right.get_snowflake_type(right_on[i])
raise ValueError(
f"You are trying to merge on {type(lt).__name__} and {type(rt).__name__} columns for key '{key}'. "
f"If you wish to proceed you should use pd.concat"
)

assert_snowpark_pandas_types_match()
if how == "asof":
assert (
left_match_col
sfc-gh-nkrishna marked this conversation as resolved.
Show resolved Hide resolved
), "ASOF join was not provided a column identifier to match on for the left table"
assert (
right_match_col
), "ASOF join was not provided a column identifier to match on for the right table"
assert (
match_comparator
), "ASOF join was not provided a comparator for the match condition"
assert_snowpark_pandas_types_match(
left, right, [left_match_col], [right_match_col]
)
if left_on and right_on:
sfc-gh-nkrishna marked this conversation as resolved.
Show resolved Hide resolved
sfc-gh-nkrishna marked this conversation as resolved.
Show resolved Hide resolved
assert len(left_on) == len(
right_on
), "left_on and right_on must be of same length or both be None"
if join_key_coalesce_config is not None:
sfc-gh-nkrishna marked this conversation as resolved.
Show resolved Hide resolved
assert len(join_key_coalesce_config) == len(
left_on
), "join_key_coalesce_config must be of same length as left_on and right_on"
sfc-gh-nkrishna marked this conversation as resolved.
Show resolved Hide resolved
assert_snowpark_pandas_types_match(left, right, left_on, right_on)
sfc-gh-nkrishna marked this conversation as resolved.
Show resolved Hide resolved

# Re-project the active columns to make sure all active columns of the internal frame participate
# in the join operation, and unnecessary columns are dropped from the projected columns.
Expand All @@ -212,15 +246,19 @@ def assert_snowpark_pandas_types_match() -> None:
)

return _create_internal_frame_with_join_or_align_result(
joined_ordered_dataframe,
left,
right,
how,
left_on,
right_on,
sort,
join_key_coalesce_config,
inherit_join_index,
result_ordered_frame=joined_ordered_dataframe,
left=left,
right=right,
how=how,
left_on=[left_match_col] # type: ignore
if how == "asof" and join_key_coalesce_config and not left_on
else left_on,
right_on=[right_match_col] # type: ignore
if how == "asof" and join_key_coalesce_config and not right_on
else right_on,
sort=sort,
key_coalesce_config=join_key_coalesce_config,
inherit_index=inherit_join_index,
)


Expand All @@ -229,8 +267,8 @@ def _create_internal_frame_with_join_or_align_result(
left: InternalFrame,
right: InternalFrame,
how: Union[JoinTypeLit, AlignTypeLit],
left_on: list[str],
right_on: list[str],
left_on: Optional[list[str]] = None,
right_on: Optional[list[str]] = None,
sort: Optional[bool] = False,
key_coalesce_config: Optional[list[JoinKeyCoalesceConfig]] = None,
inherit_index: InheritJoinIndex = InheritJoinIndex.FROM_LEFT,
Expand All @@ -244,8 +282,8 @@ def _create_internal_frame_with_join_or_align_result(
result_ordered_frame: OrderedDataFrame. The ordered dataframe result for the join/align operation.
left: InternalFrame. The original left internal frame used for the join/align.
right: InternalFrame. The original right internal frame used for the join/align.
left_on: List[str]. The columns in original left internal frame used for join/align.
right_on: List[str]. The columns in original right internal frame used for join/align.
left_on: Optional[List[str]]. The columns in original left internal frame used for join/align.
right_on: Optional[List[str]]. The columns in original right internal frame used for join/align.
how: Union[JoinTypeLit, AlignTypeLit] join or align type.
sort: Optional[bool] = False. Whether to sort the result lexicographically on the join/align keys.
key_coalesce_config: Optional[List[JoinKeyCoalesceConfig]]. Optional list of coalesce config to
Expand All @@ -259,6 +297,8 @@ def _create_internal_frame_with_join_or_align_result(
Returns:
InternalFrame for the join/aligned result with all fields set accordingly.
"""
left_on = left_on or []
right_on = right_on or []

result_helper = JoinOrAlignOrderedDataframeResultHelper(
left.ordered_dataframe,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,7 @@ def join(
assert match_comparator, "match_comparator was not provided to ASOF Join"
snowpark_dataframe = left_snowpark_dataframe_ref.snowpark_dataframe.join(
right=right_snowpark_dataframe_ref.snowpark_dataframe,
on=on,
sfc-gh-nkrishna marked this conversation as resolved.
Show resolved Hide resolved
how=how,
match_condition=getattr(left_match_col, match_comparator.value)(
right_match_col
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,6 @@ def perform_asof_join_on_frame(
left=preserving_frame,
right=referenced_frame,
how="asof",
left_on=[],
right_on=[],
left_match_col=left_timecol_snowflake_quoted_identifier,
right_match_col=right_timecol_snowflake_quoted_identifier,
match_comparator=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7457,8 +7457,6 @@ def merge_asof(
left=left_frame,
right=right_frame,
how="asof",
left_on=[left_match_col],
right_on=[right_match_col],
left_match_col=left_match_col,
right_match_col=right_match_col,
match_comparator=match_comparator,
Expand Down
Loading