Skip to content

Commit

Permalink
Merge branch 'main' into nkumar-SNOW-1637945-attrs
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-nkumar authored Aug 30, 2024
2 parents 0c4f2a9 + 200c485 commit c6dfd13
Show file tree
Hide file tree
Showing 9 changed files with 457 additions and 56 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@
- Stopped ignoring nanoseconds in `pd.Timedelta` scalars.
- Fixed AssertionError in tree of binary operations.

#### Behavior Change

- When calling `DataFrame.set_index`, or setting `DataFrame.index` or `Series.index`, with a new index that does not match the current length of the `Series`/`DataFrame` object, a `ValueError` is no longer raised. When the `Series`/`DataFrame` object is longer than the new index, the `Series`/`DataFrame`'s new index is filled with `NaN` values for the "extra" elements. When the `Series`/`DataFrame` object is shorter than the new index, the extra values in the new index are ignored—`Series` and `DataFrame` stay the same length `n`, and use only the first `n` values of the new index.

#### Improvements

- Improve concat, join performance when operations are performed on series coming from the same dataframe by avoiding unnecessary joins.

## 1.21.0 (2024-08-19)

### Snowpark Python API Updates
Expand Down
16 changes: 0 additions & 16 deletions src/snowflake/snowpark/modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,14 +604,6 @@ def _to_series_list(self, index: pd.Index) -> list[pd.Series]:
return [pd.Series(index)]

def _set_index(self, new_index: Axes) -> None:
"""
Set the index for this DataFrame.
Parameters
----------
new_index : pandas.Index
The new index to set this.
"""
# TODO: SNOW-1119855: Modin upgrade - modin.pandas.base.BasePandasDataset
self._update_inplace(
new_query_compiler=self._query_compiler.set_index(
Expand Down Expand Up @@ -655,14 +647,6 @@ def set_axis(
return obj

def _get_index(self):
"""
Get the index for this DataFrame.
Returns
-------
pandas.Index
The union of all indexes across the partitions.
"""
# TODO: SNOW-1119855: Modin upgrade - modin.pandas.base.BasePandasDataset
from snowflake.snowpark.modin.plugin.extensions.index import Index

Expand Down
103 changes: 63 additions & 40 deletions src/snowflake/snowpark/modin/plugin/_internal/join_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,26 @@ def _create_internal_frame_with_join_or_align_result(
)
index_column_types.extend(right.cached_index_column_snowpark_pandas_types)

# If the result ordering column has the same ordering columns as the original left ordering columns,
# that means the original left and right shares the same base, and no actual snowpark join is applied because
# the join is applied on the ordering column or align on the same column.
# This behavior is guaranteed by the align and join methods provided by the OrderingDataframe, when the
# snowpark join is actually applied, the result ordering column will be a combination of
# left.ordering_column and right.ordering_column, plus some assist column. For example, the ordering column
# of left join is left.ordering_column + right.ordering_column.
no_join_applied = (
result_ordered_frame.ordering_columns == left.ordered_dataframe.ordering_columns
)

if key_coalesce_config:
coalesce_column_identifiers = []
coalesce_column_values = []
for origin_left_col, origin_right_col, coalesce_config in zip(
left_on, right_on, key_coalesce_config
):
if coalesce_config == JoinKeyCoalesceConfig.NONE:
continue

coalesce_col_type = None
origin_left_col_type = (
left.snowflake_quoted_identifier_to_snowpark_pandas_type[
Expand All @@ -337,44 +351,60 @@ def _create_internal_frame_with_join_or_align_result(
origin_right_col
]
)
if coalesce_config == JoinKeyCoalesceConfig.NONE:
continue

left_col = result_helper.map_left_quoted_identifiers([origin_left_col])[0]
right_col = result_helper.map_right_quoted_identifiers([origin_right_col])[
0
]
# Coalescing is only required for 'outer' or 'asof' joins or align.
# For 'inner' and 'left' join we use left join keys and for 'right' join we
# use right join keys.
# For 'left' and 'coalesce' align we use left join keys.
if how in ("asof", "outer"):
# Generate an expression equivalent of
# "COALESCE('left_col', 'right_col') as 'left_col'"
coalesce_column_identifier = (
result_ordered_frame.generate_snowflake_quoted_identifiers(
pandas_labels=[
extract_pandas_label_from_snowflake_quoted_identifier(
left_col
)
],
)[0]
)
coalesce_column_identifiers.append(coalesce_column_identifier)
coalesce_column_values.append(coalesce(left_col, right_col))
if origin_left_col_type == origin_right_col_type:
coalesce_col_type = origin_left_col_type
elif how == "right":
# No coalescing required for 'right' join. Simply use right join key
# as output column.
coalesce_column_identifier = right_col
coalesce_col_type = origin_right_col_type
elif how in ("inner", "left", "coalesce"):
# No coalescing required for 'left' or 'inner' join and for 'left' or
# 'coalesce' align. Simply use left join key as output column.

if no_join_applied and origin_left_col == origin_right_col:
# if no join is applied, that means the result dataframe, left dataframe and right dataframe
# shares the same base dataframe. If the original left column and original right column are the
# same column, no coalesce is needed, and we always tries to keep the left column to stay align
# with the original dataframe as much as possible to increase the chance for optimization for
# later operations, especially when the later operations are applied with dfs coming from
# the ame dataframe.
# Keep left column can help stay aligned with the original dataframe is because when there are
# conflict between left and right, deduplication always happens at right. For example, when join
# or align left dataframe [col1, col2] and right dataframe [col1, col2], the result dataframe will
# have columns [col1, col2, col1_a12b, col2_de3b], where col1_a12b, col2_de3b are just alias of
# col1 and col2 in right dataframe.
coalesce_config = JoinKeyCoalesceConfig.LEFT
coalesce_column_identifier = left_col
coalesce_col_type = origin_left_col_type
else:
raise AssertionError(f"Unsupported join/align type {how}")
# Coalescing is only required for 'outer' or 'asof' joins or align.
# For 'inner' and 'left' join we use left join keys and for 'right' join we
# use right join keys.
# For 'left' and 'coalesce' align we use left join keys.
if how in ("asof", "outer"):
# Generate an expression equivalent of
# "COALESCE('left_col', 'right_col') as 'left_col'"
coalesce_column_identifier = (
result_ordered_frame.generate_snowflake_quoted_identifiers(
pandas_labels=[
extract_pandas_label_from_snowflake_quoted_identifier(
left_col
)
],
)[0]
)
coalesce_column_identifiers.append(coalesce_column_identifier)
coalesce_column_values.append(coalesce(left_col, right_col))
if origin_left_col_type == origin_right_col_type:
coalesce_col_type = origin_left_col_type
elif how == "right":
# No coalescing required for 'right' join. Simply use right join key
# as output column.
coalesce_column_identifier = right_col
coalesce_col_type = origin_right_col_type
elif how in ("inner", "left", "coalesce"):
# No coalescing required for 'left' or 'inner' join and for 'left' or
# 'coalesce' align. Simply use left join key as output column.
coalesce_column_identifier = left_col
coalesce_col_type = origin_left_col_type
else:
raise AssertionError(f"Unsupported join/align type {how}")

if coalesce_config == JoinKeyCoalesceConfig.RIGHT:
# swap left_col and right_col
Expand Down Expand Up @@ -1187,15 +1217,8 @@ def align(
# NULL NULL 2 NULL 4 e 2
coalesce_key_config = None
inherit_join_index = InheritJoinIndex.FROM_LEFT
# When it is `outer` align, we need to coalesce the align columns. However, if the
# ordering columns of aligned result is the same as the left frame, that means the
# join columns of left and right matches, then there is no need to coalesce the join
# keys, simply inherent from left gives the correct result.
# Retaining the original columns also helps avoid unnecessary join in later steps.
if (
how == "outer"
and aligned_ordered_frame.ordering_columns != left.ordering_columns
):
# When it is `outer` align, we need to coalesce the align columns.
if how == "outer":
coalesce_key_config = [JoinKeyCoalesceConfig.LEFT] * len(left_on)
inherit_join_index = InheritJoinIndex.FROM_BOTH
(
Expand Down
37 changes: 37 additions & 0 deletions src/snowflake/snowpark/modin/plugin/docstrings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,25 @@
Series([], dtype: bool)
"""

_get_set_index_doc = """
{desc}
{parameters_or_returns}
Note
----
When setting `DataFrame.index` or `Series.index` where the length of the
`Series`/`DataFrame` object does not match with the new index's length,
pandas raises a ValueError. Snowpark pandas does not raise this error;
this operation is valid.
When the `Series`/`DataFrame` object is longer than the new index,
the `Series`/`DataFrame`'s new index is filled with `NaN` values for
the "extra" elements. When the `Series`/`DataFrame` object is shorter than
the new index, the extra values in the new index are ignored—`Series` and
`DataFrame` stay the same length `n`, and use only the first `n` values of
the new index.
"""


class BasePandasDataset:
"""
Expand Down Expand Up @@ -3594,3 +3613,21 @@ def __array_function__():
BasePandasDataset
The result of the ufunc applied to the `BasePandasDataset`.
"""

@doc(
_get_set_index_doc,
desc="Get the index for this `Series`/`DataFrame`.",
parameters_or_returns="Returns\n-------\nIndex\n The index for this `Series`/`DataFrame`.",
)
def _get_index():
pass

@doc(
_get_set_index_doc,
desc="Set the index for this `Series`/`DataFrame`.",
parameters_or_returns="Parameters\n----------\nnew_index : Index\n The new index to set.",
)
def _set_index():
pass

index = property(_get_index, _set_index)
12 changes: 12 additions & 0 deletions src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3832,6 +3832,18 @@ def set_index():
DataFrame or None
Changed row labels or None if ``inplace=True``.
Note
----
When performing ``DataFrame.set_index`` where the length of the
:class:`DataFrame` object does not match with the new index's length,
a ``ValueError`` is not raised. When the :class:`DataFrame` object is
longer than the new index, the :class:`DataFrame`'s new index is filled
with ``NaN`` values for the "extra" elements. When the :class:`DataFrame`
object is shorter than the new index, the extra values in the new index
are ignored—the :class:`DataFrame` stays the same length ``n``,
and uses only the first ``n`` values of the new index.
See Also
--------
DataFrame.reset_index : Opposite of set_index.
Expand Down
23 changes: 23 additions & 0 deletions tests/integ/modin/binary/test_binary_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -2586,3 +2586,26 @@ def test_df_sub_series():
eval_snowpark_pandas_result(
snow_df, native_df, lambda df: df.sub(df["two"], axis="index"), inplace=True
)


@sql_count_checker(query_count=2, join_count=0)
def test_binary_op_multi_series_from_same_df():
native_df = native_pd.DataFrame(
{
"A": [1, 2, 3],
"B": [2, 3, 4],
"C": [4, 5, 6],
"D": [2, 2, 3],
},
index=["a", "b", "c"],
)
snow_df = pd.DataFrame(native_df)
# ensure performing more than one binary operation for series coming from same
# dataframe does not produce any join.
eval_snowpark_pandas_result(
snow_df, native_df, lambda df: df["A"] + df["B"] + df["C"]
)
# perform binary operations in different orders
eval_snowpark_pandas_result(
snow_df, native_df, lambda df: (df["A"] + df["B"]) + (df["C"] + df["D"])
)
37 changes: 37 additions & 0 deletions tests/integ/modin/test_concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -1063,3 +1063,40 @@ def test_concat_keys():
}
snow_df = pd.concat(data.values(), axis=1, keys=data.keys())
assert_frame_equal(snow_df, native_df, check_dtype=False)


@sql_count_checker(query_count=4, join_count=0)
def test_concat_series_from_same_df(join):
num_cols = 4
select_data = [f'{i} as "{i}"' for i in range(num_cols)]
query = f"select {', '.join(select_data)}"

# concat today uses join_on_index to concat all series, we use
# read_snowflake here so that the default index is created and
# managed by snowpark pandas, which is the same as row position
# column. This creates a valid optimization scenario for join, where
# join performed on the same row_position column doesn't require
# actual join.
# This can not be done with pd.DataFrame constructor because the index
# and row position column is controlled by client side, which are
# different columns.
df = pd.read_snowflake(query)

series = [df[col] for col in df.columns]
final_df = pd.concat(series, join=join, axis=1)

assert_frame_equal(df, final_df)


@sql_count_checker(query_count=4, join_count=0)
def test_df_creation_from_series_from_same_df():
num_cols = 6
select_data = [f'{i} as "{i}"' for i in range(num_cols)]
query = f"select {', '.join(select_data)}"

df = pd.read_snowflake(query)

df_dict = {col: df[col] for col in df.columns}
final_df = pd.DataFrame(df_dict)

assert_frame_equal(df, final_df)
Loading

0 comments on commit c6dfd13

Please sign in to comment.