diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f0f108fe18..7baea604bd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -88,6 +88,14 @@ - Stopped ignoring nanoseconds in `pd.Timedelta` scalars. - Fixed AssertionError in tree of binary operations. +#### Behavior Change + +- When calling `DataFrame.set_index`, or setting `DataFrame.index` or `Series.index`, with a new index that does not match the current length of the `Series`/`DataFrame` object, a `ValueError` is no longer raised. When the `Series`/`DataFrame` object is longer than the new index, the `Series`/`DataFrame`'s new index is filled with `NaN` values for the "extra" elements. When the `Series`/`DataFrame` object is shorter than the new index, the extra values in the new index are ignored—`Series` and `DataFrame` stay the same length `n`, and use only the first `n` values of the new index. + +#### Improvements + +- Improve concat, join performance when operations are performed on series coming from the same dataframe by avoiding unnecessary joins. + ## 1.21.0 (2024-08-19) ### Snowpark Python API Updates diff --git a/src/snowflake/snowpark/modin/pandas/base.py b/src/snowflake/snowpark/modin/pandas/base.py index c08cdee1386..26071049237 100644 --- a/src/snowflake/snowpark/modin/pandas/base.py +++ b/src/snowflake/snowpark/modin/pandas/base.py @@ -604,14 +604,6 @@ def _to_series_list(self, index: pd.Index) -> list[pd.Series]: return [pd.Series(index)] def _set_index(self, new_index: Axes) -> None: - """ - Set the index for this DataFrame. - - Parameters - ---------- - new_index : pandas.Index - The new index to set this. - """ # TODO: SNOW-1119855: Modin upgrade - modin.pandas.base.BasePandasDataset self._update_inplace( new_query_compiler=self._query_compiler.set_index( @@ -655,14 +647,6 @@ def set_axis( return obj def _get_index(self): - """ - Get the index for this DataFrame. - - Returns - ------- - pandas.Index - The union of all indexes across the partitions. - """ # TODO: SNOW-1119855: Modin upgrade - modin.pandas.base.BasePandasDataset from snowflake.snowpark.modin.plugin.extensions.index import Index diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index 846f3c64079..457bd388f2b 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -320,12 +320,26 @@ def _create_internal_frame_with_join_or_align_result( ) index_column_types.extend(right.cached_index_column_snowpark_pandas_types) + # If the result ordering column has the same ordering columns as the original left ordering columns, + # that means the original left and right shares the same base, and no actual snowpark join is applied because + # the join is applied on the ordering column or align on the same column. + # This behavior is guaranteed by the align and join methods provided by the OrderingDataframe, when the + # snowpark join is actually applied, the result ordering column will be a combination of + # left.ordering_column and right.ordering_column, plus some assist column. For example, the ordering column + # of left join is left.ordering_column + right.ordering_column. + no_join_applied = ( + result_ordered_frame.ordering_columns == left.ordered_dataframe.ordering_columns + ) + if key_coalesce_config: coalesce_column_identifiers = [] coalesce_column_values = [] for origin_left_col, origin_right_col, coalesce_config in zip( left_on, right_on, key_coalesce_config ): + if coalesce_config == JoinKeyCoalesceConfig.NONE: + continue + coalesce_col_type = None origin_left_col_type = ( left.snowflake_quoted_identifier_to_snowpark_pandas_type[ @@ -337,44 +351,60 @@ def _create_internal_frame_with_join_or_align_result( origin_right_col ] ) - if coalesce_config == JoinKeyCoalesceConfig.NONE: - continue + left_col = result_helper.map_left_quoted_identifiers([origin_left_col])[0] right_col = result_helper.map_right_quoted_identifiers([origin_right_col])[ 0 ] - # Coalescing is only required for 'outer' or 'asof' joins or align. - # For 'inner' and 'left' join we use left join keys and for 'right' join we - # use right join keys. - # For 'left' and 'coalesce' align we use left join keys. - if how in ("asof", "outer"): - # Generate an expression equivalent of - # "COALESCE('left_col', 'right_col') as 'left_col'" - coalesce_column_identifier = ( - result_ordered_frame.generate_snowflake_quoted_identifiers( - pandas_labels=[ - extract_pandas_label_from_snowflake_quoted_identifier( - left_col - ) - ], - )[0] - ) - coalesce_column_identifiers.append(coalesce_column_identifier) - coalesce_column_values.append(coalesce(left_col, right_col)) - if origin_left_col_type == origin_right_col_type: - coalesce_col_type = origin_left_col_type - elif how == "right": - # No coalescing required for 'right' join. Simply use right join key - # as output column. - coalesce_column_identifier = right_col - coalesce_col_type = origin_right_col_type - elif how in ("inner", "left", "coalesce"): - # No coalescing required for 'left' or 'inner' join and for 'left' or - # 'coalesce' align. Simply use left join key as output column. + + if no_join_applied and origin_left_col == origin_right_col: + # if no join is applied, that means the result dataframe, left dataframe and right dataframe + # shares the same base dataframe. If the original left column and original right column are the + # same column, no coalesce is needed, and we always tries to keep the left column to stay align + # with the original dataframe as much as possible to increase the chance for optimization for + # later operations, especially when the later operations are applied with dfs coming from + # the ame dataframe. + # Keep left column can help stay aligned with the original dataframe is because when there are + # conflict between left and right, deduplication always happens at right. For example, when join + # or align left dataframe [col1, col2] and right dataframe [col1, col2], the result dataframe will + # have columns [col1, col2, col1_a12b, col2_de3b], where col1_a12b, col2_de3b are just alias of + # col1 and col2 in right dataframe. + coalesce_config = JoinKeyCoalesceConfig.LEFT coalesce_column_identifier = left_col coalesce_col_type = origin_left_col_type else: - raise AssertionError(f"Unsupported join/align type {how}") + # Coalescing is only required for 'outer' or 'asof' joins or align. + # For 'inner' and 'left' join we use left join keys and for 'right' join we + # use right join keys. + # For 'left' and 'coalesce' align we use left join keys. + if how in ("asof", "outer"): + # Generate an expression equivalent of + # "COALESCE('left_col', 'right_col') as 'left_col'" + coalesce_column_identifier = ( + result_ordered_frame.generate_snowflake_quoted_identifiers( + pandas_labels=[ + extract_pandas_label_from_snowflake_quoted_identifier( + left_col + ) + ], + )[0] + ) + coalesce_column_identifiers.append(coalesce_column_identifier) + coalesce_column_values.append(coalesce(left_col, right_col)) + if origin_left_col_type == origin_right_col_type: + coalesce_col_type = origin_left_col_type + elif how == "right": + # No coalescing required for 'right' join. Simply use right join key + # as output column. + coalesce_column_identifier = right_col + coalesce_col_type = origin_right_col_type + elif how in ("inner", "left", "coalesce"): + # No coalescing required for 'left' or 'inner' join and for 'left' or + # 'coalesce' align. Simply use left join key as output column. + coalesce_column_identifier = left_col + coalesce_col_type = origin_left_col_type + else: + raise AssertionError(f"Unsupported join/align type {how}") if coalesce_config == JoinKeyCoalesceConfig.RIGHT: # swap left_col and right_col @@ -1187,15 +1217,8 @@ def align( # NULL NULL 2 NULL 4 e 2 coalesce_key_config = None inherit_join_index = InheritJoinIndex.FROM_LEFT - # When it is `outer` align, we need to coalesce the align columns. However, if the - # ordering columns of aligned result is the same as the left frame, that means the - # join columns of left and right matches, then there is no need to coalesce the join - # keys, simply inherent from left gives the correct result. - # Retaining the original columns also helps avoid unnecessary join in later steps. - if ( - how == "outer" - and aligned_ordered_frame.ordering_columns != left.ordering_columns - ): + # When it is `outer` align, we need to coalesce the align columns. + if how == "outer": coalesce_key_config = [JoinKeyCoalesceConfig.LEFT] * len(left_on) inherit_join_index = InheritJoinIndex.FROM_BOTH ( diff --git a/src/snowflake/snowpark/modin/plugin/docstrings/base.py b/src/snowflake/snowpark/modin/plugin/docstrings/base.py index a6a0aff1af4..3ba4f2f2dab 100644 --- a/src/snowflake/snowpark/modin/plugin/docstrings/base.py +++ b/src/snowflake/snowpark/modin/plugin/docstrings/base.py @@ -386,6 +386,25 @@ Series([], dtype: bool) """ +_get_set_index_doc = """ +{desc} + +{parameters_or_returns} + +Note +---- +When setting `DataFrame.index` or `Series.index` where the length of the +`Series`/`DataFrame` object does not match with the new index's length, +pandas raises a ValueError. Snowpark pandas does not raise this error; +this operation is valid. +When the `Series`/`DataFrame` object is longer than the new index, +the `Series`/`DataFrame`'s new index is filled with `NaN` values for +the "extra" elements. When the `Series`/`DataFrame` object is shorter than +the new index, the extra values in the new index are ignored—`Series` and +`DataFrame` stay the same length `n`, and use only the first `n` values of +the new index. +""" + class BasePandasDataset: """ @@ -3594,3 +3613,21 @@ def __array_function__(): BasePandasDataset The result of the ufunc applied to the `BasePandasDataset`. """ + + @doc( + _get_set_index_doc, + desc="Get the index for this `Series`/`DataFrame`.", + parameters_or_returns="Returns\n-------\nIndex\n The index for this `Series`/`DataFrame`.", + ) + def _get_index(): + pass + + @doc( + _get_set_index_doc, + desc="Set the index for this `Series`/`DataFrame`.", + parameters_or_returns="Parameters\n----------\nnew_index : Index\n The new index to set.", + ) + def _set_index(): + pass + + index = property(_get_index, _set_index) diff --git a/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py b/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py index a42ef48eb94..047b4592068 100644 --- a/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py +++ b/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py @@ -3832,6 +3832,18 @@ def set_index(): DataFrame or None Changed row labels or None if ``inplace=True``. + Note + ---- + When performing ``DataFrame.set_index`` where the length of the + :class:`DataFrame` object does not match with the new index's length, + a ``ValueError`` is not raised. When the :class:`DataFrame` object is + longer than the new index, the :class:`DataFrame`'s new index is filled + with ``NaN`` values for the "extra" elements. When the :class:`DataFrame` + object is shorter than the new index, the extra values in the new index + are ignored—the :class:`DataFrame` stays the same length ``n``, + and uses only the first ``n`` values of the new index. + + See Also -------- DataFrame.reset_index : Opposite of set_index. diff --git a/tests/integ/modin/binary/test_binary_op.py b/tests/integ/modin/binary/test_binary_op.py index 9ae5db98369..cd036bcb045 100644 --- a/tests/integ/modin/binary/test_binary_op.py +++ b/tests/integ/modin/binary/test_binary_op.py @@ -2586,3 +2586,26 @@ def test_df_sub_series(): eval_snowpark_pandas_result( snow_df, native_df, lambda df: df.sub(df["two"], axis="index"), inplace=True ) + + +@sql_count_checker(query_count=2, join_count=0) +def test_binary_op_multi_series_from_same_df(): + native_df = native_pd.DataFrame( + { + "A": [1, 2, 3], + "B": [2, 3, 4], + "C": [4, 5, 6], + "D": [2, 2, 3], + }, + index=["a", "b", "c"], + ) + snow_df = pd.DataFrame(native_df) + # ensure performing more than one binary operation for series coming from same + # dataframe does not produce any join. + eval_snowpark_pandas_result( + snow_df, native_df, lambda df: df["A"] + df["B"] + df["C"] + ) + # perform binary operations in different orders + eval_snowpark_pandas_result( + snow_df, native_df, lambda df: (df["A"] + df["B"]) + (df["C"] + df["D"]) + ) diff --git a/tests/integ/modin/test_concat.py b/tests/integ/modin/test_concat.py index 7e11a3537af..c1366c22506 100644 --- a/tests/integ/modin/test_concat.py +++ b/tests/integ/modin/test_concat.py @@ -1063,3 +1063,40 @@ def test_concat_keys(): } snow_df = pd.concat(data.values(), axis=1, keys=data.keys()) assert_frame_equal(snow_df, native_df, check_dtype=False) + + +@sql_count_checker(query_count=4, join_count=0) +def test_concat_series_from_same_df(join): + num_cols = 4 + select_data = [f'{i} as "{i}"' for i in range(num_cols)] + query = f"select {', '.join(select_data)}" + + # concat today uses join_on_index to concat all series, we use + # read_snowflake here so that the default index is created and + # managed by snowpark pandas, which is the same as row position + # column. This creates a valid optimization scenario for join, where + # join performed on the same row_position column doesn't require + # actual join. + # This can not be done with pd.DataFrame constructor because the index + # and row position column is controlled by client side, which are + # different columns. + df = pd.read_snowflake(query) + + series = [df[col] for col in df.columns] + final_df = pd.concat(series, join=join, axis=1) + + assert_frame_equal(df, final_df) + + +@sql_count_checker(query_count=4, join_count=0) +def test_df_creation_from_series_from_same_df(): + num_cols = 6 + select_data = [f'{i} as "{i}"' for i in range(num_cols)] + query = f"select {', '.join(select_data)}" + + df = pd.read_snowflake(query) + + df_dict = {col: df[col] for col in df.columns} + final_df = pd.DataFrame(df_dict) + + assert_frame_equal(df, final_df) diff --git a/tests/integ/modin/types/test_timedelta_indexing.py b/tests/integ/modin/types/test_timedelta_indexing.py index 7c8bbcb8a10..af36b319a26 100644 --- a/tests/integ/modin/types/test_timedelta_indexing.py +++ b/tests/integ/modin/types/test_timedelta_indexing.py @@ -389,3 +389,183 @@ def loc_enlargement(key, item, df): loc_enlargement(key, item, snow_td.copy()).to_pandas().dtypes, snow_td.dtypes, ) + + +@pytest.mark.parametrize( + "key, join_count", + [(2, 2), ([2, 1], 2), (slice(1, None), 0), ([True, False, False, True], 1)], +) +def test_index_get_timedelta(key, join_count): + td_idx = native_pd.TimedeltaIndex( + [ + native_pd.Timedelta("1 days 1 hour"), + native_pd.Timedelta("2 days 1 minute"), + native_pd.Timedelta("3 days 1 nanoseconds"), + native_pd.Timedelta("100 nanoseconds"), + ] + ) + snow_td_idx = pd.TimedeltaIndex(td_idx) + + with SqlCounter(query_count=1, join_count=join_count): + if is_scalar(key): + assert snow_td_idx[key] == td_idx[key] + else: + eval_snowpark_pandas_result(snow_td_idx, td_idx, lambda idx: idx[key]) + + +@pytest.mark.parametrize( + "key, api, query_count, join_count", + [ + [2, "iat", 1, 2], + [native_pd.Timedelta("1 days 1 hour"), "at", 2, 2], + [[2, 1], "iloc", 1, 2], + [ + [ + native_pd.Timedelta("1 days 1 hour"), + native_pd.Timedelta("1 days 1 hour"), + ], + "loc", + 1, + 1, + ], + [slice(1, None), "iloc", 1, 0], + [[True, False, False, True], "iloc", 1, 1], + [[True, False, False, True], "loc", 1, 1], + ], +) +def test_series_with_timedelta_index(key, api, query_count, join_count): + td_idx = native_pd.TimedeltaIndex( + [ + native_pd.Timedelta("1 days 1 hour"), + native_pd.Timedelta("2 days 1 minute"), + native_pd.Timedelta("3 days 1 nanoseconds"), + native_pd.Timedelta("100 nanoseconds"), + ] + ) + snow_td_idx = pd.TimedeltaIndex(td_idx) + + data = [1, 2, 3, 4] + native_series = native_pd.Series(data, index=td_idx) + snow_series = pd.Series(data, index=snow_td_idx) + + with SqlCounter(query_count=query_count, join_count=join_count): + if is_scalar(key): + assert getattr(snow_series, api)[key] == getattr(native_series, api)[key] + else: + eval_snowpark_pandas_result( + snow_series, native_series, lambda s: getattr(s, api)[key] + ) + + +@pytest.mark.parametrize( + "key, api, query_count, join_count", + [ + [2, "iat", 1, 2], + [native_pd.Timedelta("1 days 1 hour"), "at", 2, 2], + [[2, 1], "iloc", 1, 2], + [ + [ + native_pd.Timedelta("1 days 1 hour"), + native_pd.Timedelta("1 days 1 hour"), + ], + "loc", + 1, + 1, + ], + [slice(1, None), "iloc", 1, 0], + [[True, False, False, True], "iloc", 1, 1], + [[True, False, False, True], "loc", 1, 1], + ], +) +def test_df_with_timedelta_index(key, api, query_count, join_count): + td_idx = native_pd.TimedeltaIndex( + [ + native_pd.Timedelta("1 days 1 hour"), + native_pd.Timedelta("2 days 1 minute"), + native_pd.Timedelta("3 days 1 nanoseconds"), + native_pd.Timedelta("100 nanoseconds"), + ] + ) + snow_td_idx = pd.TimedeltaIndex(td_idx) + + data = [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]] + native_df = native_pd.DataFrame(data, index=td_idx) + snow_df = pd.DataFrame(data, index=snow_td_idx) + + with SqlCounter(query_count=query_count, join_count=join_count): + if is_scalar(key): + assert getattr(snow_df, api)[key, 0] == getattr(native_df, api)[key, 0] + else: + eval_snowpark_pandas_result( + snow_df, native_df, lambda s: getattr(s, api)[key] + ) + + +def test_df_with_timedelta_index_enlargement_during_indexing(): + td_idx = native_pd.TimedeltaIndex( + [ + native_pd.Timedelta("1 days 1 hour"), + native_pd.Timedelta("2 days 1 minute"), + native_pd.Timedelta("3 days 1 nanoseconds"), + native_pd.Timedelta("100 nanoseconds"), + ] + ) + snow_td_idx = pd.TimedeltaIndex(td_idx) + + data = [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]] + cols = ["a", "b", "c", "d"] + native_df = native_pd.DataFrame(data, index=td_idx, columns=cols) + snow_df = pd.DataFrame(data, index=snow_td_idx, columns=cols) + + def setitem_enlargement(key, item, df): + df[key] = item + return df + + item = 23 + + key = native_pd.Timedelta("2 days") + with SqlCounter(query_count=1, join_count=0): + eval_snowpark_pandas_result( + snow_df.copy(), + native_df.copy(), + functools.partial(setitem_enlargement, key, item), + ) + + key = native_pd.Timedelta("2 days 45 minutes") + with SqlCounter(query_count=1, join_count=1): + eval_snowpark_pandas_result( + snow_df["a"].copy(), + native_df["a"].copy(), + functools.partial(setitem_enlargement, key, item), + ) + + def loc_enlargement(key, item, df): + df.loc[key] = item + return df + + key = (slice(None, None, None), "x") + + with SqlCounter(query_count=1, join_count=0): + eval_snowpark_pandas_result( + snow_df.copy(), + native_df.copy(), + functools.partial(loc_enlargement, key, item), + ) + + key = native_pd.Timedelta("2 days 25 minutes") + with SqlCounter(query_count=1, join_count=1): + eval_snowpark_pandas_result( + snow_df["a"].copy(), + native_df["a"].copy(), + functools.partial(loc_enlargement, key, item), + ) + + # single row + key = (native_pd.Timedelta("2 days 45 minutes"), slice(None, None, None)) + + with SqlCounter(query_count=1, join_count=1): + eval_snowpark_pandas_result( + snow_df.copy(), + native_df.copy(), + functools.partial(loc_enlargement, key, item), + ) diff --git a/tests/unit/modin/test_join_utils.py b/tests/unit/modin/test_join_utils.py new file mode 100644 index 00000000000..031ab13bef9 --- /dev/null +++ b/tests/unit/modin/test_join_utils.py @@ -0,0 +1,97 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# + +from collections.abc import Hashable +from unittest import mock + +import pytest + +import snowflake.snowpark.modin.plugin # noqa: F401 +from snowflake.snowpark.modin.plugin._internal.frame import ( + OrderedDataFrame, + OrderingColumn, +) +from snowflake.snowpark.modin.plugin._internal.join_utils import ( + InheritJoinIndex, + JoinKeyCoalesceConfig, + _create_internal_frame_with_join_or_align_result, +) +from snowflake.snowpark.modin.plugin.compiler.snowflake_query_compiler import ( + InternalFrame, +) + + +def mock_internal_frame( + data_column_pandas_labels: list[Hashable], + data_column_pandas_index_names: list[Hashable], + data_column_snowflake_quoted_identifiers: list[str], + index_column_pandas_labels: list[Hashable], + index_column_snowflake_quoted_identifiers: list[str], +) -> InternalFrame: + ordered_dataframe = mock.create_autospec(OrderedDataFrame) + ordered_dataframe.projected_column_snowflake_quoted_identifiers = ( + data_column_snowflake_quoted_identifiers + + index_column_snowflake_quoted_identifiers + ) + ordered_dataframe.ordering_columns = [ + OrderingColumn(col) + for col in ordered_dataframe.projected_column_snowflake_quoted_identifiers + ] + internal_frame = InternalFrame.create( + ordered_dataframe=ordered_dataframe, + data_column_pandas_labels=data_column_pandas_labels, + data_column_pandas_index_names=data_column_pandas_index_names, + data_column_snowflake_quoted_identifiers=data_column_snowflake_quoted_identifiers, + index_column_pandas_labels=index_column_pandas_labels, + index_column_snowflake_quoted_identifiers=index_column_snowflake_quoted_identifiers, + data_column_types=[None] * len(data_column_pandas_labels), + index_column_types=[None] * len(index_column_pandas_labels), + ) + + return internal_frame + + +def test_create_internal_frame_with_result_using_invalid_methods(): + left_frame = mock_internal_frame( + data_column_pandas_labels=["a1", "b1"], + data_column_pandas_index_names=[None], + data_column_snowflake_quoted_identifiers=['"A1"', '"B1"'], + index_column_pandas_labels=["i1"], + index_column_snowflake_quoted_identifiers=['"I1"'], + ) + + right_frame = mock_internal_frame( + data_column_pandas_labels=["a2", "b2"], + data_column_pandas_index_names=[None], + data_column_snowflake_quoted_identifiers=['"A2"', '"B2"'], + index_column_pandas_labels=["i2"], + index_column_snowflake_quoted_identifiers=['"I2"'], + ) + + result_ordered_frame = mock.create_autospec(OrderedDataFrame) + result_ordered_frame.projected_column_snowflake_quoted_identifiers = [ + '"I1"', + '"A1"', + '"B1"', + '"I2"', + '"A2"', + '"B2"', + ] + result_ordered_frame._ordering_columns_tuple = [ + OrderingColumn('"I1"'), + OrderingColumn('"I2"'), + ] + + with pytest.raises(AssertionError, match="Unsupported join/align type invalid"): + _create_internal_frame_with_join_or_align_result( + result_ordered_frame=result_ordered_frame, + left=left_frame, + right=right_frame, + how="invalid", + left_on=['"I1"'], + right_on=['"I2"'], + sort=False, + key_coalesce_config=[JoinKeyCoalesceConfig.LEFT], + inherit_index=InheritJoinIndex.FROM_LEFT, + )