Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SNOW-1636729] Improve join/align performance by avoid unnecessary coalesce #2165

Merged
merged 15 commits into from
Aug 30, 2024
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@

- When calling `DataFrame.set_index`, or setting `DataFrame.index` or `Series.index`, with a new index that does not match the current length of the `Series`/`DataFrame` object, a `ValueError` is no longer raised. When the `Series`/`DataFrame` object is longer than the new index, the `Series`/`DataFrame`'s new index is filled with `NaN` values for the "extra" elements. When the `Series`/`DataFrame` object is shorter than the new index, the extra values in the new index are ignored—`Series` and `DataFrame` stay the same length `n`, and use only the first `n` values of the new index.

#### Improvements

- Improve concat, join performance when operations are performed on series coming from the same dataframe by avoiding unnecessary joins.

## 1.21.0 (2024-08-19)

Expand Down
103 changes: 63 additions & 40 deletions src/snowflake/snowpark/modin/plugin/_internal/join_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,26 @@ def _create_internal_frame_with_join_or_align_result(
)
index_column_types.extend(right.cached_index_column_snowpark_pandas_types)

# If the result ordering column has the same ordering columns as the original left ordering columns,
# that means the original left and right shares the same base, and no actual snowpark join is applied because
# the join is applied on the ordering column or align on the same column.
# This behavior is guaranteed by the align and join methods provided by the OrderingDataframe, when the
# snowpark join is actually applied, the result ordering column will be a combination of
# left.ordering_column and right.ordering_column, plus some assist column. For example, the ordering column
# of left join is left.ordering_column + right.ordering_column.
no_join_applied = (
result_ordered_frame.ordering_columns == left.ordered_dataframe.ordering_columns
)

if key_coalesce_config:
coalesce_column_identifiers = []
coalesce_column_values = []
for origin_left_col, origin_right_col, coalesce_config in zip(
left_on, right_on, key_coalesce_config
):
if coalesce_config == JoinKeyCoalesceConfig.NONE:
continue

coalesce_col_type = None
origin_left_col_type = (
left.snowflake_quoted_identifier_to_snowpark_pandas_type[
Expand All @@ -337,44 +351,60 @@ def _create_internal_frame_with_join_or_align_result(
origin_right_col
]
)
if coalesce_config == JoinKeyCoalesceConfig.NONE:
continue

left_col = result_helper.map_left_quoted_identifiers([origin_left_col])[0]
right_col = result_helper.map_right_quoted_identifiers([origin_right_col])[
0
]
# Coalescing is only required for 'outer' or 'asof' joins or align.
# For 'inner' and 'left' join we use left join keys and for 'right' join we
# use right join keys.
# For 'left' and 'coalesce' align we use left join keys.
if how in ("asof", "outer"):
# Generate an expression equivalent of
# "COALESCE('left_col', 'right_col') as 'left_col'"
coalesce_column_identifier = (
result_ordered_frame.generate_snowflake_quoted_identifiers(
pandas_labels=[
extract_pandas_label_from_snowflake_quoted_identifier(
left_col
)
],
)[0]
)
coalesce_column_identifiers.append(coalesce_column_identifier)
coalesce_column_values.append(coalesce(left_col, right_col))
if origin_left_col_type == origin_right_col_type:
coalesce_col_type = origin_left_col_type
elif how == "right":
# No coalescing required for 'right' join. Simply use right join key
# as output column.
coalesce_column_identifier = right_col
coalesce_col_type = origin_right_col_type
elif how in ("inner", "left", "coalesce"):
# No coalescing required for 'left' or 'inner' join and for 'left' or
# 'coalesce' align. Simply use left join key as output column.

if no_join_applied and origin_left_col == origin_right_col:
sfc-gh-mvashishtha marked this conversation as resolved.
Show resolved Hide resolved
# if no join is applied, that means the result dataframe, left dataframe and right dataframe
# shares the same base dataframe. If the original left column and original right column are the
# same column, no coalesce is needed, and we always tries to keep the left column to stay align
# with the original dataframe as much as possible to increase the chance for optimization for
# later operations, especially when the later operations are applied with dfs coming from
# the ame dataframe.
# Keep left column can help stay aligned with the original dataframe is because when there are
# conflict between left and right, deduplication always happens at right. For example, when join
# or align left dataframe [col1, col2] and right dataframe [col1, col2], the result dataframe will
# have columns [col1, col2, col1_a12b, col2_de3b], where col1_a12b, col2_de3b are just alias of
# col1 and col2 in right dataframe.
coalesce_config = JoinKeyCoalesceConfig.LEFT
coalesce_column_identifier = left_col
coalesce_col_type = origin_left_col_type
else:
raise AssertionError(f"Unsupported join/align type {how}")
# Coalescing is only required for 'outer' or 'asof' joins or align.
# For 'inner' and 'left' join we use left join keys and for 'right' join we
# use right join keys.
# For 'left' and 'coalesce' align we use left join keys.
if how in ("asof", "outer"):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sfc-gh-nkumar When i start looking at this part of code now, it feels little bit wired, it seems we are deciding how the column coalesce happens, we do not look into the coalesce configure, but the join type. It is good in the sense that it tries to reduce the extra logic caller need to check, but it is kind of confusing in the sense that we also have an coalesce configure parameter there

Copy link
Contributor

@sfc-gh-nkumar sfc-gh-nkumar Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, IIRC current logic is to ignore the coalesce config for join types where coalesce is not required. We can probably assert if coalesce config is provided for join types which do not require assert. But I don't feel strongly either way.

# Generate an expression equivalent of
# "COALESCE('left_col', 'right_col') as 'left_col'"
coalesce_column_identifier = (
result_ordered_frame.generate_snowflake_quoted_identifiers(
pandas_labels=[
extract_pandas_label_from_snowflake_quoted_identifier(
left_col
)
],
)[0]
)
coalesce_column_identifiers.append(coalesce_column_identifier)
coalesce_column_values.append(coalesce(left_col, right_col))
if origin_left_col_type == origin_right_col_type:
coalesce_col_type = origin_left_col_type
elif how == "right":
# No coalescing required for 'right' join. Simply use right join key
# as output column.
coalesce_column_identifier = right_col
coalesce_col_type = origin_right_col_type
elif how in ("inner", "left", "coalesce"):
# No coalescing required for 'left' or 'inner' join and for 'left' or
# 'coalesce' align. Simply use left join key as output column.
coalesce_column_identifier = left_col
coalesce_col_type = origin_left_col_type
else:
raise AssertionError(f"Unsupported join/align type {how}")

if coalesce_config == JoinKeyCoalesceConfig.RIGHT:
# swap left_col and right_col
Expand Down Expand Up @@ -1187,15 +1217,8 @@ def align(
# NULL NULL 2 NULL 4 e 2
coalesce_key_config = None
inherit_join_index = InheritJoinIndex.FROM_LEFT
# When it is `outer` align, we need to coalesce the align columns. However, if the
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sfc-gh-nkumar this optimization is now performed in general for all join and align

# ordering columns of aligned result is the same as the left frame, that means the
# join columns of left and right matches, then there is no need to coalesce the join
# keys, simply inherent from left gives the correct result.
# Retaining the original columns also helps avoid unnecessary join in later steps.
if (
how == "outer"
and aligned_ordered_frame.ordering_columns != left.ordering_columns
):
# When it is `outer` align, we need to coalesce the align columns.
if how == "outer":
coalesce_key_config = [JoinKeyCoalesceConfig.LEFT] * len(left_on)
inherit_join_index = InheritJoinIndex.FROM_BOTH
(
Expand Down
23 changes: 23 additions & 0 deletions tests/integ/modin/binary/test_binary_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -2586,3 +2586,26 @@ def test_df_sub_series():
eval_snowpark_pandas_result(
snow_df, native_df, lambda df: df.sub(df["two"], axis="index"), inplace=True
)


@sql_count_checker(query_count=2, join_count=0)
def test_binary_op_multi_series_from_same_df():
native_df = native_pd.DataFrame(
{
"A": [1, 2, 3],
"B": [2, 3, 4],
"C": [4, 5, 6],
"D": [2, 2, 3],
},
index=["a", "b", "c"],
)
snow_df = pd.DataFrame(native_df)
# ensure performing more than one binary operation for series coming from same
# dataframe does not produce any join.
eval_snowpark_pandas_result(
snow_df, native_df, lambda df: df["A"] + df["B"] + df["C"]
)
# perform binary operations in different orders
eval_snowpark_pandas_result(
snow_df, native_df, lambda df: (df["A"] + df["B"]) + (df["C"] + df["D"])
)
37 changes: 37 additions & 0 deletions tests/integ/modin/test_concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -1063,3 +1063,40 @@ def test_concat_keys():
}
snow_df = pd.concat(data.values(), axis=1, keys=data.keys())
assert_frame_equal(snow_df, native_df, check_dtype=False)


@sql_count_checker(query_count=4, join_count=0)
def test_concat_series_from_same_df(join):
num_cols = 4
select_data = [f'{i} as "{i}"' for i in range(num_cols)]
query = f"select {', '.join(select_data)}"

# concat today uses join_on_index to concat all series, we use
# read_snowflake here so that the default index is created and
# managed by snowpark pandas, which is the same as row position
# column. This creates a valid optimization scenario for join, where
# join performed on the same row_position column doesn't require
# actual join.
# This can not be done with pd.DataFrame constructor because the index
# and row position column is controlled by client side, which are
# different columns.
df = pd.read_snowflake(query)
sfc-gh-mvashishtha marked this conversation as resolved.
Show resolved Hide resolved

series = [df[col] for col in df.columns]
final_df = pd.concat(series, join=join, axis=1)

assert_frame_equal(df, final_df)


@sql_count_checker(query_count=4, join_count=0)
def test_df_creation_from_series_from_same_df():
num_cols = 6
select_data = [f'{i} as "{i}"' for i in range(num_cols)]
query = f"select {', '.join(select_data)}"

df = pd.read_snowflake(query)

df_dict = {col: df[col] for col in df.columns}
final_df = pd.DataFrame(df_dict)

assert_frame_equal(df, final_df)
97 changes: 97 additions & 0 deletions tests/unit/modin/test_join_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#

from collections.abc import Hashable
from unittest import mock

import pytest

import snowflake.snowpark.modin.plugin # noqa: F401
from snowflake.snowpark.modin.plugin._internal.frame import (
OrderedDataFrame,
OrderingColumn,
)
from snowflake.snowpark.modin.plugin._internal.join_utils import (
InheritJoinIndex,
JoinKeyCoalesceConfig,
_create_internal_frame_with_join_or_align_result,
)
from snowflake.snowpark.modin.plugin.compiler.snowflake_query_compiler import (
InternalFrame,
)


def mock_internal_frame(
data_column_pandas_labels: list[Hashable],
data_column_pandas_index_names: list[Hashable],
data_column_snowflake_quoted_identifiers: list[str],
index_column_pandas_labels: list[Hashable],
index_column_snowflake_quoted_identifiers: list[str],
) -> InternalFrame:
ordered_dataframe = mock.create_autospec(OrderedDataFrame)
ordered_dataframe.projected_column_snowflake_quoted_identifiers = (
data_column_snowflake_quoted_identifiers
+ index_column_snowflake_quoted_identifiers
)
ordered_dataframe.ordering_columns = [
OrderingColumn(col)
for col in ordered_dataframe.projected_column_snowflake_quoted_identifiers
]
internal_frame = InternalFrame.create(
ordered_dataframe=ordered_dataframe,
data_column_pandas_labels=data_column_pandas_labels,
data_column_pandas_index_names=data_column_pandas_index_names,
data_column_snowflake_quoted_identifiers=data_column_snowflake_quoted_identifiers,
index_column_pandas_labels=index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=index_column_snowflake_quoted_identifiers,
data_column_types=[None] * len(data_column_pandas_labels),
index_column_types=[None] * len(index_column_pandas_labels),
)

return internal_frame


def test_create_internal_frame_with_result_using_invalid_methods():
left_frame = mock_internal_frame(
data_column_pandas_labels=["a1", "b1"],
data_column_pandas_index_names=[None],
data_column_snowflake_quoted_identifiers=['"A1"', '"B1"'],
index_column_pandas_labels=["i1"],
index_column_snowflake_quoted_identifiers=['"I1"'],
)

right_frame = mock_internal_frame(
data_column_pandas_labels=["a2", "b2"],
data_column_pandas_index_names=[None],
data_column_snowflake_quoted_identifiers=['"A2"', '"B2"'],
index_column_pandas_labels=["i2"],
index_column_snowflake_quoted_identifiers=['"I2"'],
)

result_ordered_frame = mock.create_autospec(OrderedDataFrame)
result_ordered_frame.projected_column_snowflake_quoted_identifiers = [
'"I1"',
'"A1"',
'"B1"',
'"I2"',
'"A2"',
'"B2"',
]
result_ordered_frame._ordering_columns_tuple = [
OrderingColumn('"I1"'),
OrderingColumn('"I2"'),
]

with pytest.raises(AssertionError, match="Unsupported join/align type invalid"):
_create_internal_frame_with_join_or_align_result(
result_ordered_frame=result_ordered_frame,
left=left_frame,
right=right_frame,
how="invalid",
left_on=['"I1"'],
right_on=['"I2"'],
sort=False,
key_coalesce_config=[JoinKeyCoalesceConfig.LEFT],
inherit_index=InheritJoinIndex.FROM_LEFT,
)
Loading