From 4ec3164e8b24d61e7ede18c4c447c39db6dabe09 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Wed, 1 May 2024 17:53:13 -0700 Subject: [PATCH 01/18] Add test for empty table pivot_table --- tests/integ/modin/pivot/test_pivot_single.py | 24 ++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/integ/modin/pivot/test_pivot_single.py b/tests/integ/modin/pivot/test_pivot_single.py index 1147d55ca7d..476bf0b783d 100644 --- a/tests/integ/modin/pivot/test_pivot_single.py +++ b/tests/integ/modin/pivot/test_pivot_single.py @@ -73,6 +73,30 @@ def test_pivot_table_multi_index_single_column_single_value(df_data, aggfunc): ) +@sql_count_checker(query_count=1) +def test_pivot_table_empty_table_with_groupby_columns(): + # Cannot use pivot_table_test_helper since that checks the inferred types + # on the resulting DataFrames' columns (which are empty), and the inferred type + # on our DataFrame's columns is empty, while pandas has type floating. + import pandas as native_pd + + native_df = native_pd.DataFrame({"A": [], "B": [], "C": [], "D": []}) + snow_df = pd.DataFrame(native_df) + pivot_kwargs = { + "index": ["A", "B"], + "columns": "C", + "values": "D", + "aggfunc": "count", + } + + snow_result = snow_df.pivot_table(**pivot_kwargs).to_pandas() + native_result = native_df.pivot_table(**pivot_kwargs) + + assert native_result.empty == snow_result.empty and (native_result.empty is True) + assert list(native_result.columns) == list(snow_result.columns) + assert list(native_result.index) == list(snow_result.index) + + @sql_count_checker(query_count=1) def test_pivot_table_single_index_no_column_single_value(df_data): pivot_table_test_helper( From 2d08c305d390616d374d6f7455438b6db9131fc2 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Wed, 1 May 2024 21:10:52 -0700 Subject: [PATCH 02/18] [SNOW-1013917]: Add support for `pivot_table` without `index` parameter --- .../modin/plugin/_internal/pivot_utils.py | 58 +++++--- .../compiler/snowflake_query_compiler.py | 10 +- .../integ/modin/pivot/test_pivot_multiple.py | 129 ++++++++++++++++++ .../integ/modin/pivot/test_pivot_negative.py | 17 --- tests/integ/modin/pivot/test_pivot_single.py | 20 ++- 5 files changed, 193 insertions(+), 41 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py index 3b27cb2609c..509851fb017 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py @@ -159,7 +159,13 @@ def pivot_helper( # constructed and passed into the single pivot operation to prepend the remaining of the pandas labels. for pivot_aggr_grouping in pivot_aggr_groupings: existing_snowflake_quoted_identifiers = groupby_snowflake_quoted_identifiers - if last_ordered_dataframe is not None: + if ( + last_ordered_dataframe is not None + and len(groupby_snowflake_quoted_identifiers) > 0 + ): + # If there are no index columns, then we append the OrderedDataFrame's vertically, rather + # than horizontally, so we do not need to dedupe the columns (and in fact we want the columns + # to have the same name since we want them to match up during the union. existing_snowflake_quoted_identifiers = ( last_ordered_dataframe.projected_column_snowflake_quoted_identifiers ) @@ -179,25 +185,45 @@ def pivot_helper( ) if last_ordered_dataframe: - last_ordered_dataframe = last_ordered_dataframe.join( - right=new_pivot_ordered_dataframe, - left_on_cols=groupby_snowflake_quoted_identifiers, - right_on_cols=groupby_snowflake_quoted_identifiers, - how="left", - ) + # If there are index columns, then we join the two OrderedDataFrames + # (horizontally), while if there are no index columns, we concatenate + # them vertically, and have the index be the value column each row + # corresponds to. + # We also join vertically if there are multiple columns and multiple + # pivot values. + if ( + len(groupby_snowflake_quoted_identifiers) > 0 + ): # or (not multiple_values_and_columns): + last_ordered_dataframe = last_ordered_dataframe.join( + right=new_pivot_ordered_dataframe, + left_on_cols=groupby_snowflake_quoted_identifiers, + right_on_cols=groupby_snowflake_quoted_identifiers, + how="left", + ) + data_column_snowflake_quoted_identifiers.extend( + new_data_column_snowflake_quoted_identifiers + ) + data_column_pandas_labels.extend(new_data_column_pandas_labels) + else: + last_ordered_dataframe = last_ordered_dataframe.union_all( + new_pivot_ordered_dataframe + ) else: last_ordered_dataframe = new_pivot_ordered_dataframe - - data_column_snowflake_quoted_identifiers.extend( - new_data_column_snowflake_quoted_identifiers - ) - data_column_pandas_labels.extend(new_data_column_pandas_labels) + data_column_snowflake_quoted_identifiers.extend( + new_data_column_snowflake_quoted_identifiers + ) + data_column_pandas_labels.extend(new_data_column_pandas_labels) ordered_dataframe = last_ordered_dataframe + # When there are no groupby columns, the index is the first column in the OrderedDataFrame. + # Otherwise, the index is the groupby columns. + length_of_index_columns = max(1, len(groupby_snowflake_quoted_identifiers)) + index_column_snowflake_quoted_identifiers = ( ordered_dataframe.projected_column_snowflake_quoted_identifiers[ - 0 : len(groupby_snowflake_quoted_identifiers) + 0:length_of_index_columns ] ) index = index or [None] * len(index_column_snowflake_quoted_identifiers) @@ -299,9 +325,7 @@ def single_pivot_helper( project_snowflake_quoted_identifiers ) - index_snowflake_quoted_identifiers = ( - groupby_snowflake_quoted_identifiers or pivot_snowflake_quoted_identifiers or [] - ) + index_snowflake_quoted_identifiers = groupby_snowflake_quoted_identifiers or [] if not pivot_snowflake_quoted_identifiers or not aggr_snowflake_quoted_identifier: if not groupby_snowflake_quoted_identifiers: @@ -400,6 +424,7 @@ def single_pivot_helper( ), "*", ) + index_snowflake_quoted_identifiers = [pivot_snowflake_quoted_identifiers[0]] # Go through each of the non-group by columns and # 1. Generate corresponding pandas label (without prefix) @@ -686,7 +711,6 @@ def generate_single_pivot_labels( if not pandas_aggfunc_list: continue - # 2. Loop through all aggregation functions for this aggregation value. for pandas_single_aggr_func in pandas_aggfunc_list: # pandas only adds aggregation value as label if provided as a list diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index 5c9b3bfb64c..beeebd5ffe6 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -6290,11 +6290,6 @@ def pivot_table( else [] ) - if len(groupby_snowflake_quoted_identifiers) == 0: - raise NotImplementedError( - "pivot_table with no index configuration is currently not supported" - ) - if values is None: # If no values (aggregation columns) are specified, then we use all data columns that are neither # groupby (index) nor pivot columns as the aggregation columns. For example, a dataframe with @@ -6316,12 +6311,15 @@ def pivot_table( ) ) + include_pivot_columns_in_label = len( + groupby_snowflake_quoted_identifiers + ) != 0 or (isinstance(aggfunc, list) and len(aggfunc) > 1) pivot_aggr_groupings = list( generate_single_pivot_labels( values_label_to_identifier_pairs_list, aggfunc, len(pivot_snowflake_quoted_identifiers) > 0, - isinstance(values, list), + isinstance(values, list) and include_pivot_columns_in_label, sort, ) ) diff --git a/tests/integ/modin/pivot/test_pivot_multiple.py b/tests/integ/modin/pivot/test_pivot_multiple.py index 0b1e295ccb2..232fab65bcb 100644 --- a/tests/integ/modin/pivot/test_pivot_multiple.py +++ b/tests/integ/modin/pivot/test_pivot_multiple.py @@ -2,11 +2,15 @@ # # Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. # +import modin.pandas as pd import numpy as np +import pandas as native_pd import pytest +import snowflake.snowpark.modin.plugin # noqa: F401 from tests.integ.modin.pivot.pivot_utils import pivot_table_test_helper from tests.integ.modin.sql_counter import SqlCounter, sql_count_checker +from tests.integ.modin.utils import eval_snowpark_pandas_result @sql_count_checker(query_count=1, join_count=1) @@ -21,6 +25,29 @@ def test_pivot_table_single_index_single_column_multiple_values(df_data): ) +@sql_count_checker(query_count=1, union_count=1) +def test_pivot_table_no_index_single_column_multiple_values(df_data): + pivot_table_test_helper( + df_data, + { + "columns": "B", + "values": ["D", "E"], + }, + ) + + +@sql_count_checker(query_count=1, union_count=1) +def test_pivot_table_no_index_single_column_multiple_values_multiple_aggr_func(df_data): + pivot_table_test_helper( + df_data, + { + "columns": "B", + "values": ["D", "E"], + "aggfunc": ["mean", "max"], + }, + ) + + @pytest.mark.parametrize("aggfunc", ["count", "sum", "min", "max", "mean"]) @sql_count_checker(query_count=1) def test_pivot_table_single_index_multiple_column_single_value(df_data, aggfunc): @@ -35,6 +62,31 @@ def test_pivot_table_single_index_multiple_column_single_value(df_data, aggfunc) ) +@pytest.mark.parametrize("aggfunc", ["count", "sum", "min", "max", "mean"]) +@sql_count_checker(query_count=1) +def test_pivot_table_no_index_multiple_column_single_value(df_data, aggfunc): + pivot_table_test_helper( + df_data, + { + "columns": ["B", "C"], + "values": "D", + "aggfunc": aggfunc, + }, + ) + + +@sql_count_checker(query_count=1) +def test_pivot_table_no_index_multiple_column_single_value_multiple_aggr_func(df_data): + pivot_table_test_helper( + df_data, + { + "columns": ["B", "C"], + "values": "D", + "aggfunc": ["mean", "max"], + }, + ) + + @pytest.mark.skip( "SNOW-853416: Some lingering encoding issues and also unsorted order does not match" ) @@ -94,6 +146,31 @@ def test_pivot_table_single_index_multiple_columns_multiple_values(df_data): ) +@sql_count_checker(query_count=1, union_count=1) +def test_pivot_table_no_index_multiple_columns_multiple_values(df_data): + pivot_table_test_helper( + df_data, + { + "columns": ["B", "C"], + "values": ["D", "E"], + }, + ) + + +@sql_count_checker(query_count=1, union_count=1) +def test_pivot_table_no_index_multiple_columns_multiple_values_multiple_aggr_funcs( + df_data, +): + pivot_table_test_helper( + df_data, + { + "columns": ["B", "C"], + "values": ["D", "E"], + "aggfunc": ["mean", "max"], + }, + ) + + @sql_count_checker(query_count=1, join_count=1) def test_pivot_table_multiple_index_multiple_columns_multiple_values(df_data): pivot_table_test_helper( @@ -119,6 +196,58 @@ def test_pivot_table_single_index_no_column_single_value_multiple_aggr_funcs(df_ ) +@sql_count_checker(query_count=0) +def test_pivot_table_no_index_no_column_single_value(df_data): + pivot_kwargs = { + "values": "D", + "aggfunc": "mean", + } + eval_snowpark_pandas_result( + pd.DataFrame(df_data), + native_pd.DataFrame(df_data), + lambda df: df.pivot_table(**pivot_kwargs), + assert_exception_equal=True, + expect_exception=True, + expect_exception_match="No group keys passed!", + expect_exception_type=ValueError, + ) + + +@sql_count_checker(query_count=0) +def test_pivot_table_no_index_no_column_single_value_multiple_aggr_funcs(df_data): + pivot_kwargs = { + "values": "D", + "aggfunc": ["mean", "max"], + } + eval_snowpark_pandas_result( + pd.DataFrame(df_data), + native_pd.DataFrame(df_data), + lambda df: df.pivot_table(**pivot_kwargs), + assert_exception_equal=True, + expect_exception=True, + expect_exception_match="No group keys passed!", + expect_exception_type=ValueError, + ) + + +@sql_count_checker(query_count=0, join_count=0) +def test_pivot_table_no_index_no_column_no_value_multiple_aggr_funcs(df_data): + pivot_kwargs = { + "columns": None, + "values": None, + "aggfunc": ["min", "max"], + } + eval_snowpark_pandas_result( + pd.DataFrame(df_data), + native_pd.DataFrame(df_data), + lambda df: df.pivot_table(**pivot_kwargs), + assert_exception_equal=True, + expect_exception=True, + expect_exception_match="No group keys passed!", + expect_exception_type=ValueError, + ) + + @pytest.mark.skip( "SNOW-854301: Multi-Index replaces None with Nan causing test to fail" ) diff --git a/tests/integ/modin/pivot/test_pivot_negative.py b/tests/integ/modin/pivot/test_pivot_negative.py index 3751ae11522..9ed330e0f55 100644 --- a/tests/integ/modin/pivot/test_pivot_negative.py +++ b/tests/integ/modin/pivot/test_pivot_negative.py @@ -77,23 +77,6 @@ def test_pivot_table_invalid_values_columns_not_supported(df_data, pivot_table_k ) -@sql_count_checker(query_count=0) -def test_pivot_table_no_index_no_column_single_value_raises_error(df_data): - pivot_table_test_helper_expects_exception( - df_data, - { - "index": None, - "columns": None, - "values": "D", - }, - # we currently throws NotImplementedError if no "index" configuration is provided. - # TODO (SNOW-959913): Enable support for no "index" configuration - expect_exception_type=NotImplementedError, - expect_exception_match="pivot_table with no index configuration is currently not supported", - assert_exception_equal=False, - ) - - @pytest.mark.parametrize( "aggfunc", [ diff --git a/tests/integ/modin/pivot/test_pivot_single.py b/tests/integ/modin/pivot/test_pivot_single.py index 476bf0b783d..eac3975cf4f 100644 --- a/tests/integ/modin/pivot/test_pivot_single.py +++ b/tests/integ/modin/pivot/test_pivot_single.py @@ -73,8 +73,26 @@ def test_pivot_table_multi_index_single_column_single_value(df_data, aggfunc): ) +@pytest.mark.parametrize( + "aggfunc", + [ + "count", + "sum", + "min", + "max", + "mean", + ], +) +@sql_count_checker(query_count=1) +def test_pivot_table_no_index(df_data, aggfunc): + pivot_table_test_helper( + df_data, + {"columns": "C", "values": "D", "aggfunc": aggfunc}, + ) + + @sql_count_checker(query_count=1) -def test_pivot_table_empty_table_with_groupby_columns(): +def test_pivot_table_empty_table_with_index(): # Cannot use pivot_table_test_helper since that checks the inferred types # on the resulting DataFrames' columns (which are empty), and the inferred type # on our DataFrame's columns is empty, while pandas has type floating. From 861a28de48a12f848daeaf74c9b921d1eb81b0f2 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Tue, 7 May 2024 15:21:14 -0700 Subject: [PATCH 03/18] Add fix for test of multiple values + multiple aggr funcs --- .../modin/plugin/_internal/pivot_utils.py | 192 +++++++++++++----- .../compiler/snowflake_query_compiler.py | 13 +- .../integ/modin/pivot/test_pivot_multiple.py | 6 +- 3 files changed, 149 insertions(+), 62 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py index 509851fb017..380b70ccec6 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py @@ -5,7 +5,7 @@ from collections.abc import Generator, Hashable from functools import reduce from itertools import product -from typing import Any, Callable, Optional, Union +from typing import Any, Callable, NamedTuple, Optional, Union from pandas._typing import AggFuncType, AggFuncTypeBase, Scalar @@ -61,6 +61,84 @@ ) +class PivottedOrderedDataFrameResult(NamedTuple): + # The OrderedDataFrame representation for the join or align result + ordered_dataframe: OrderedDataFrame + # The data column pandas labels of the new frame. + data_column_pandas_labels: list[Hashable] + # The data column snowflake quoted identifiers of the new frame. + data_column_snowflake_quoted_identifiers: list[str] + + +def perform_pivot_and_concatenate( + ordered_dataframe: OrderedDataFrame, + pivot_aggr_groupings: list[PivotAggrGrouping], + groupby_snowflake_quoted_identifiers: list[str], + pivot_snowflake_quoted_identifiers: list[str], + should_join_along_columns: bool, +) -> tuple[OrderedDataFrame, list[Hashable], list[str]]: + last_ordered_dataframe = None + data_column_pandas_labels: list[Hashable] = [] + data_column_snowflake_quoted_identifiers: list[str] = [] + for pivot_aggr_grouping in pivot_aggr_groupings: + existing_snowflake_quoted_identifiers = groupby_snowflake_quoted_identifiers + if last_ordered_dataframe is not None and should_join_along_columns: + # If there are no index columns, then we append the OrderedDataFrame's vertically, rather + # than horizontally, so we do not need to dedupe the columns (and in fact we want the columns + # to have the same name since we want them to match up during the union. + existing_snowflake_quoted_identifiers = ( + last_ordered_dataframe.projected_column_snowflake_quoted_identifiers + ) + + ( + new_pivot_ordered_dataframe, + new_data_column_snowflake_quoted_identifiers, + new_data_column_pandas_labels, + ) = single_pivot_helper( + ordered_dataframe, + existing_snowflake_quoted_identifiers, + groupby_snowflake_quoted_identifiers, + pivot_snowflake_quoted_identifiers, + pivot_aggr_grouping.aggr_label_identifier_pair, + pivot_aggr_grouping.aggfunc, + pivot_aggr_grouping.prefix_label, + ) + + if last_ordered_dataframe: + # If there are index columns, then we join the two OrderedDataFrames + # (horizontally), while if there are no index columns, we concatenate + # them vertically, and have the index be the value column each row + # corresponds to. + # We also join vertically if there are multiple columns and multiple + # pivot values. + if should_join_along_columns: # or (not multiple_values_and_columns): + last_ordered_dataframe = last_ordered_dataframe.join( + right=new_pivot_ordered_dataframe, + left_on_cols=groupby_snowflake_quoted_identifiers, + right_on_cols=groupby_snowflake_quoted_identifiers, + how="left", + ) + data_column_snowflake_quoted_identifiers.extend( + new_data_column_snowflake_quoted_identifiers + ) + data_column_pandas_labels.extend(new_data_column_pandas_labels) + else: + last_ordered_dataframe = last_ordered_dataframe.union_all( + new_pivot_ordered_dataframe + ) + else: + last_ordered_dataframe = new_pivot_ordered_dataframe + data_column_snowflake_quoted_identifiers.extend( + new_data_column_snowflake_quoted_identifiers + ) + data_column_pandas_labels.extend(new_data_column_pandas_labels) + return PivottedOrderedDataFrameResult( + last_ordered_dataframe, + data_column_pandas_labels, + data_column_snowflake_quoted_identifiers, + ) + + def pivot_helper( pivot_frame: InternalFrame, pivot_aggr_groupings: list[PivotAggrGrouping], @@ -69,6 +147,8 @@ def pivot_helper( columns: Any, groupby_snowflake_quoted_identifiers: list[str], pivot_snowflake_quoted_identifiers: list[str], + multiple_aggr_funcs: bool, + multiple_values: bool, index: Optional[list], ) -> InternalFrame: """ @@ -82,6 +162,8 @@ def pivot_helper( columns: The columns argument passed to `pivot_table`. Will become the pandas labels for the data column index. groupby_snowflake_quoted_identifiers: Group by identifiers pivot_snowflake_quoted_identifiers: Pivot identifiers + multiple_aggr_funcs: Whether or not multiple aggregation functions have been passed in. + multiple_values: Whether or not multiple values columns have been passed in. index: The index argument passed to `pivot_table` if specified. Will become the pandas labels for the index column. Returns: InternalFrame @@ -100,7 +182,6 @@ def pivot_helper( if ordered_dataframe.queries.get("post_actions"): ordered_dataframe = cache_result(ordered_dataframe) - last_ordered_dataframe = None data_column_pandas_labels: list[Hashable] = [] data_column_snowflake_quoted_identifiers: list[str] = [] @@ -157,65 +238,66 @@ def pivot_helper( # # The multi-level pandas prefix label that includes the aggregation value and function labels is also # constructed and passed into the single pivot operation to prepend the remaining of the pandas labels. - for pivot_aggr_grouping in pivot_aggr_groupings: - existing_snowflake_quoted_identifiers = groupby_snowflake_quoted_identifiers - if ( - last_ordered_dataframe is not None - and len(groupby_snowflake_quoted_identifiers) > 0 - ): - # If there are no index columns, then we append the OrderedDataFrame's vertically, rather - # than horizontally, so we do not need to dedupe the columns (and in fact we want the columns - # to have the same name since we want them to match up during the union. - existing_snowflake_quoted_identifiers = ( - last_ordered_dataframe.projected_column_snowflake_quoted_identifiers - ) - - ( - new_pivot_ordered_dataframe, - new_data_column_snowflake_quoted_identifiers, - new_data_column_pandas_labels, - ) = single_pivot_helper( - ordered_dataframe, - existing_snowflake_quoted_identifiers, - groupby_snowflake_quoted_identifiers, - pivot_snowflake_quoted_identifiers, - pivot_aggr_grouping.aggr_label_identifier_pair, - pivot_aggr_grouping.aggfunc, - pivot_aggr_grouping.prefix_label, - ) - - if last_ordered_dataframe: - # If there are index columns, then we join the two OrderedDataFrames - # (horizontally), while if there are no index columns, we concatenate - # them vertically, and have the index be the value column each row - # corresponds to. - # We also join vertically if there are multiple columns and multiple - # pivot values. - if ( - len(groupby_snowflake_quoted_identifiers) > 0 - ): # or (not multiple_values_and_columns): - last_ordered_dataframe = last_ordered_dataframe.join( - right=new_pivot_ordered_dataframe, - left_on_cols=groupby_snowflake_quoted_identifiers, - right_on_cols=groupby_snowflake_quoted_identifiers, - how="left", + if ( + len(groupby_snowflake_quoted_identifiers) == 0 + and multiple_aggr_funcs + and multiple_values + ): + values_pandas_labels = { + pair.aggr_label_identifier_pair.pandas_label + for pair in pivot_aggr_groupings + } + grouped_pivot_aggr_groupings = { + v: list( + filter( + lambda pair: pair.aggr_label_identifier_pair.pandas_label == v, + pivot_aggr_groupings, ) - data_column_snowflake_quoted_identifiers.extend( + ) + for v in values_pandas_labels + } + last_ordered_dataframe = None + for value_column in values_pandas_labels: + ( + pivot_ordered_dataframe, + new_data_column_pandas_labels, + new_data_column_snowflake_quoted_identifiers, + ) = perform_pivot_and_concatenate( + ordered_dataframe, + grouped_pivot_aggr_groupings[value_column], + groupby_snowflake_quoted_identifiers, + pivot_snowflake_quoted_identifiers, + True, + ) + if last_ordered_dataframe is None: + last_ordered_dataframe = pivot_ordered_dataframe + data_column_pandas_labels = new_data_column_pandas_labels + data_column_snowflake_quoted_identifiers = ( new_data_column_snowflake_quoted_identifiers ) - data_column_pandas_labels.extend(new_data_column_pandas_labels) else: last_ordered_dataframe = last_ordered_dataframe.union_all( - new_pivot_ordered_dataframe + pivot_ordered_dataframe ) - else: - last_ordered_dataframe = new_pivot_ordered_dataframe - data_column_snowflake_quoted_identifiers.extend( - new_data_column_snowflake_quoted_identifiers - ) - data_column_pandas_labels.extend(new_data_column_pandas_labels) - - ordered_dataframe = last_ordered_dataframe + assert ( + new_data_column_pandas_labels == data_column_pandas_labels + ), "Labels should match when doing multiple values and multiple aggregation functions and no index." + ordered_dataframe = last_ordered_dataframe + else: + should_join_along_columns = len(groupby_snowflake_quoted_identifiers) > 0 or ( + multiple_aggr_funcs and not multiple_values + ) + ( + ordered_dataframe, + data_column_pandas_labels, + data_column_snowflake_quoted_identifiers, + ) = perform_pivot_and_concatenate( + ordered_dataframe, + pivot_aggr_groupings, + groupby_snowflake_quoted_identifiers, + pivot_snowflake_quoted_identifiers, + should_join_along_columns, + ) # When there are no groupby columns, the index is the first column in the OrderedDataFrame. # Otherwise, the index is the groupby columns. diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index 455a1567fd2..cdec62aa1ea 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -6346,10 +6346,13 @@ def pivot_table( values, self._modin_frame ) ) - - include_pivot_columns_in_label = len( - groupby_snowflake_quoted_identifiers - ) != 0 or (isinstance(aggfunc, list) and len(aggfunc) > 1) + multiple_agg_funcs_single_values = ( + isinstance(aggfunc, list) and len(aggfunc) > 1 + ) and not (isinstance(values, list) and len(values) > 1) + include_pivot_columns_in_label = ( + len(groupby_snowflake_quoted_identifiers) != 0 + or multiple_agg_funcs_single_values + ) pivot_aggr_groupings = list( generate_single_pivot_labels( values_label_to_identifier_pairs_list, @@ -6369,6 +6372,8 @@ def pivot_table( columns, groupby_snowflake_quoted_identifiers, pivot_snowflake_quoted_identifiers, + (isinstance(aggfunc, list) and len(aggfunc) > 1), + (isinstance(values, list) and len(values) > 1), index, ) diff --git a/tests/integ/modin/pivot/test_pivot_multiple.py b/tests/integ/modin/pivot/test_pivot_multiple.py index 232fab65bcb..861520cfe70 100644 --- a/tests/integ/modin/pivot/test_pivot_multiple.py +++ b/tests/integ/modin/pivot/test_pivot_multiple.py @@ -36,7 +36,7 @@ def test_pivot_table_no_index_single_column_multiple_values(df_data): ) -@sql_count_checker(query_count=1, union_count=1) +@sql_count_checker(query_count=1, union_count=1, join_count=2) def test_pivot_table_no_index_single_column_multiple_values_multiple_aggr_func(df_data): pivot_table_test_helper( df_data, @@ -75,7 +75,7 @@ def test_pivot_table_no_index_multiple_column_single_value(df_data, aggfunc): ) -@sql_count_checker(query_count=1) +@sql_count_checker(query_count=1, join_count=1) def test_pivot_table_no_index_multiple_column_single_value_multiple_aggr_func(df_data): pivot_table_test_helper( df_data, @@ -157,7 +157,7 @@ def test_pivot_table_no_index_multiple_columns_multiple_values(df_data): ) -@sql_count_checker(query_count=1, union_count=1) +@sql_count_checker(query_count=1, union_count=1, join_count=2) def test_pivot_table_no_index_multiple_columns_multiple_values_multiple_aggr_funcs( df_data, ): From db83f8405909943ed3bcaf7793d1a30f63d82a86 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Tue, 7 May 2024 15:22:19 -0700 Subject: [PATCH 04/18] Add changelog --- src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md b/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md index 92b66e09181..7810abe1490 100644 --- a/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md +++ b/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md @@ -46,6 +46,7 @@ ### New Features - Added partial support for `SeriesGroupBy.apply` (where the `SeriesGrouBy` is obtained through `DataFrameGroupBy.__getitem__`). +- Added support for `DataFrame.pivot_table` with no `index` parameter. ## 1.14.0a2 (2024-04-18) From fc19f4e47baa4ff1a1ae46bc510e98822107c00e Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Tue, 7 May 2024 15:39:58 -0700 Subject: [PATCH 05/18] Add test to cover combinatorials --- tests/integ/modin/pivot/test_pivot_multiple.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/integ/modin/pivot/test_pivot_multiple.py b/tests/integ/modin/pivot/test_pivot_multiple.py index 861520cfe70..f092c9c804c 100644 --- a/tests/integ/modin/pivot/test_pivot_multiple.py +++ b/tests/integ/modin/pivot/test_pivot_multiple.py @@ -48,6 +48,18 @@ def test_pivot_table_no_index_single_column_multiple_values_multiple_aggr_func(d ) +@sql_count_checker(query_count=1, join_count=1) +def test_pivot_table_no_index_single_column_single_values_multiple_aggr_func(df_data): + pivot_table_test_helper( + df_data, + { + "columns": "B", + "values": "D", + "aggfunc": ["mean", "max"], + }, + ) + + @pytest.mark.parametrize("aggfunc", ["count", "sum", "min", "max", "mean"]) @sql_count_checker(query_count=1) def test_pivot_table_single_index_multiple_column_single_value(df_data, aggfunc): From 3d5da361b82f8c4adf27f59e41cc84c4d01eeea5 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Tue, 7 May 2024 17:14:08 -0700 Subject: [PATCH 06/18] Remove outdated unsupported error tests --- tests/integ/modin/pivot/test_pivot_negative.py | 12 ------------ tests/integ/modin/pivot/test_pivot_single.py | 6 +++--- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/tests/integ/modin/pivot/test_pivot_negative.py b/tests/integ/modin/pivot/test_pivot_negative.py index 9ed330e0f55..a009e188f7a 100644 --- a/tests/integ/modin/pivot/test_pivot_negative.py +++ b/tests/integ/modin/pivot/test_pivot_negative.py @@ -161,15 +161,3 @@ def dummy_aggr_func(series): match="median", ): snow_df.pivot_table(index="A", columns="C", values="D", aggfunc="median") - - with pytest.raises( - NotImplementedError, - match="pivot_table with no index configuration is currently not supported", - ): - snow_df.pivot_table(index=None, columns="C", values="D") - - with pytest.raises( - NotImplementedError, - match="pivot_table with no index configuration is currently not supported", - ): - snow_df.pivot_table(index=None, columns=None, values="D") diff --git a/tests/integ/modin/pivot/test_pivot_single.py b/tests/integ/modin/pivot/test_pivot_single.py index eac3975cf4f..580270ebdfe 100644 --- a/tests/integ/modin/pivot/test_pivot_single.py +++ b/tests/integ/modin/pivot/test_pivot_single.py @@ -148,9 +148,9 @@ def test_pivot_table_no_index_no_column_single_value(df_data): "columns": None, "values": "D", }, - expect_exception_match="pivot_table with no index configuration is currently not supported", - expect_exception_type=NotImplementedError, - assert_exception_equal=False, + expect_exception_match=r"No group keys passed\!", + expect_exception_type=ValueError, + assert_exception_equal=True, ) From c52a3680d34688b1e720afd63b07b93c7a1741c3 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Tue, 7 May 2024 17:19:26 -0700 Subject: [PATCH 07/18] Remove commented code --- src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py index 380b70ccec6..5ccc27ff1df 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py @@ -111,7 +111,7 @@ def perform_pivot_and_concatenate( # corresponds to. # We also join vertically if there are multiple columns and multiple # pivot values. - if should_join_along_columns: # or (not multiple_values_and_columns): + if should_join_along_columns: last_ordered_dataframe = last_ordered_dataframe.join( right=new_pivot_ordered_dataframe, left_on_cols=groupby_snowflake_quoted_identifiers, From 27dcc317945a48fb70cc277856f001545ab930cf Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Wed, 15 May 2024 19:15:28 -0700 Subject: [PATCH 08/18] Address review comments, fix margins partially --- .../modin/plugin/_internal/pivot_utils.py | 68 ++++++- .../compiler/snowflake_query_compiler.py | 178 ++++++++++++++++-- tests/integ/modin/pivot/test_pivot_margins.py | 53 ++++++ 3 files changed, 281 insertions(+), 18 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py index 5ccc27ff1df..82bfcbade5c 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py @@ -61,7 +61,7 @@ ) -class PivottedOrderedDataFrameResult(NamedTuple): +class PivotedOrderedDataFrameResult(NamedTuple): # The OrderedDataFrame representation for the join or align result ordered_dataframe: OrderedDataFrame # The data column pandas labels of the new frame. @@ -76,7 +76,17 @@ def perform_pivot_and_concatenate( groupby_snowflake_quoted_identifiers: list[str], pivot_snowflake_quoted_identifiers: list[str], should_join_along_columns: bool, -) -> tuple[OrderedDataFrame, list[Hashable], list[str]]: +) -> PivotedOrderedDataFrameResult: + """ + Helper functio to perform a full pivot (including joining in the case of multiple aggrs or values) on an OrderedDataFrame. + + Args: + ordered_dataframe: The ordered dataframe to perform pivot on. + pivot_aggr_groupings: A list of PivotAggrGroupings that define the aggregations to apply. + groupby_snowflake_quoted_identifiers: Group by identifiers + pivot_snowflake_quoted_identifiers: Pivot identifiers + should_join_along_columns: Whether to join along columns, or use union to join along rows instead. + """ last_ordered_dataframe = None data_column_pandas_labels: list[Hashable] = [] data_column_snowflake_quoted_identifiers: list[str] = [] @@ -132,7 +142,7 @@ def perform_pivot_and_concatenate( new_data_column_snowflake_quoted_identifiers ) data_column_pandas_labels.extend(new_data_column_pandas_labels) - return PivottedOrderedDataFrameResult( + return PivotedOrderedDataFrameResult( last_ordered_dataframe, data_column_pandas_labels, data_column_snowflake_quoted_identifiers, @@ -162,8 +172,8 @@ def pivot_helper( columns: The columns argument passed to `pivot_table`. Will become the pandas labels for the data column index. groupby_snowflake_quoted_identifiers: Group by identifiers pivot_snowflake_quoted_identifiers: Pivot identifiers - multiple_aggr_funcs: Whether or not multiple aggregation functions have been passed in. - multiple_values: Whether or not multiple values columns have been passed in. + multiple_aggr_funcs: Whether multiple aggregation functions have been passed in. + multiple_values: Whether multiple values columns have been passed in. index: The index argument passed to `pivot_table` if specified. Will become the pandas labels for the index column. Returns: InternalFrame @@ -243,10 +253,23 @@ def pivot_helper( and multiple_aggr_funcs and multiple_values ): + # When there are multiple aggregation functions, values, and `index=None`, we need + # to handle pivot a little differently. Rather than just joining horizontally or vertically, + # we need to join both horizontally and vertically - each value column gets its own row, so + # for every resulting OrderedDataFrame corresponding to the result of an aggregation on a single + # value, we need to join (concatenate horizontally) to get one row. For every value column, + # we then need to union (concatenate vertically) the resulting rows from the previous step. + # In order to handle this, we first group the aggregations by the column they act on, and run + # one pivot per group of aggregations. We then have multiple one row OrderedDataFrames, where each + # OrderedDataFrame is the result of pivot on a single value column, which we can union in order to + # get our final result. + # Step 1: Determine the values columns. values_pandas_labels = { pair.aggr_label_identifier_pair.pandas_label for pair in pivot_aggr_groupings } + # Step 2: Group aggregations by the values column they are on. + # Result: {"val_col1": [aggr1, aggr2], "val_col2}": [aggr3, aggr4]} grouped_pivot_aggr_groupings = { v: list( filter( @@ -256,6 +279,7 @@ def pivot_helper( ) for v in values_pandas_labels } + # Step 5: Perform pivot for every value column, and union together. last_ordered_dataframe = None for value_column in values_pandas_labels: ( @@ -284,6 +308,9 @@ def pivot_helper( ), "Labels should match when doing multiple values and multiple aggregation functions and no index." ordered_dataframe = last_ordered_dataframe else: + # If there are no index columns (groupby_snowflake_quoted_identifiers) and + # a single aggregation function or a single value, we should join vertically + # instead of horizontally. should_join_along_columns = len(groupby_snowflake_quoted_identifiers) > 0 or ( multiple_aggr_funcs and not multiple_values ) @@ -1209,6 +1236,10 @@ def expand_pivot_result_with_pivot_table_margins( # 14 9 margin_columns_aggregations = [] + # breakpoint() + # # When there are no `groupby_snowflake_quoted_identifiers`, the values column is not in the prefix labels, and is instead + # # in the index column. This codepath expects that the values columns are included in the prefixes of the data column pandas labels. + # if len(groupby_snowflake_quoted_identifiers) > 0: # Step 1) Generate mapping of prefix to data columns aligned with each grouping. In this example would generate: # (count, D) -> [(count, D, foo, red), (count, D, bar, blue)] # (sum, E) -> [(sum, E, foo, red), (sum, E, bar, blue)] @@ -1348,6 +1379,33 @@ def expand_pivot_result_with_pivot_table_margins( updated_data_column_snowflake_quoted_identifiers.append( margin_column_aggr_snowflake_quoted_identifier ) + # # Step 1: When there are no groupby columns, the data column pandas label's format changes depending on how + # # many pivot columns there are. For a single pivot column, the resulting DataFrame has labels with only 1 level; + # # but when there are multiple pivot columns, the margin column takes the first pivot column's values as a prefix. + # # For each subsequent pivot column, an additional empty post-fix is added. + # if len(pivot_snowflake_quoted_identifiers) == 1: + # # If there is only a single pivot column, then we just add 1 column with the name of the margin column per pivot + # # value, which should be equal to the number of columns in the pivoted dataframe. + # new_data_column_pandas_labels = [margins_name] * len(pivoted_qc.columns) + # else: + # new_data_column_pandas_labels = [] + # num_levels_to_pad = pivoted_qc.index.nlevels - 2 + # for prefix in pivoted_qc.index.get_level_values(0).unique(): + # new_data_column_pandas_labels.append((prefix, margins_name) + tuple('' for _ in range(num_levels_to_pad))) + # values_snowflake_quoted_identifiers = {pair.aggr_label_identifier_pair.snowflake_quoted_identifier for pair in pivot_aggr_groupings} + # value_to_aggr_func = {v: [pair.aggfunc for pair in filter(lambda pair: pair.aggr_label_identifier_pair.snowflake_quoted_identifier == v, pivot_aggr_groupings)] for v in values_snowflake_quoted_identifiers} + # for value_snowflake_quoted_identifier, aggfunc in values_snowflake_quoted_identifiers: + # margin_columns_aggregations.append( + # apply_fill_value_to_snowpark_column( + # get_margin_aggregation( + # aggfunc, + # col(value_snowflake_quoted_identifier) + # ), + # fill_value, + # ).as_(original_ordered_dataframe) + # ) + + # pass # Step 3) # To generate the margin column aggregations we need to group by the groupby_snowflake_quoted_identifiers and join diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index cdec62aa1ea..3be7b1a6ef2 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -6349,7 +6349,7 @@ def pivot_table( multiple_agg_funcs_single_values = ( isinstance(aggfunc, list) and len(aggfunc) > 1 ) and not (isinstance(values, list) and len(values) > 1) - include_pivot_columns_in_label = ( + include_aggr_func_in_label = ( len(groupby_snowflake_quoted_identifiers) != 0 or multiple_agg_funcs_single_values ) @@ -6358,7 +6358,9 @@ def pivot_table( values_label_to_identifier_pairs_list, aggfunc, len(pivot_snowflake_quoted_identifiers) > 0, - isinstance(values, list) and include_pivot_columns_in_label, + isinstance(values, list) + and len(values) > 1 + and include_aggr_func_in_label, sort, ) ) @@ -6392,15 +6394,165 @@ def pivot_table( # Add margins if specified, note this will also add the row position since the margin row needs to be fixed # as the last row of the dataframe. If no margins, then we order by the group by columns. if margins and pivot_aggr_groupings and pivot_snowflake_quoted_identifiers: - pivot_qc = expand_pivot_result_with_pivot_table_margins( - pivot_aggr_groupings, - groupby_snowflake_quoted_identifiers, - pivot_snowflake_quoted_identifiers, - self._modin_frame.ordered_dataframe, - pivot_qc, - margins_name, - fill_value, - ) + if len(groupby_snowflake_quoted_identifiers) > 0: + pivot_qc = expand_pivot_result_with_pivot_table_margins( + pivot_aggr_groupings, + groupby_snowflake_quoted_identifiers, + pivot_snowflake_quoted_identifiers, + self._modin_frame.ordered_dataframe, + pivot_qc, + margins_name, + fill_value, + ) + else: + names = pivot_qc.columns.names + margins_frame = pivot_helper( + self._modin_frame, + pivot_aggr_groupings, + not dropna, + not isinstance(aggfunc, list), + columns[:1], + groupby_snowflake_quoted_identifiers, + pivot_snowflake_quoted_identifiers[:1], + (isinstance(aggfunc, list) and len(aggfunc) > 1), + (isinstance(values, list) and len(values) > 1), + index, + ) + if len(columns) > 1: + # If there are multiple pivot columns, we need to add the margin_name to the margins frame's data column + # pandas labels, as well as any empty postfixes for the remaining pivot columns if there are more than 2. + new_data_column_pandas_labels = [] + for label in margins_frame.data_column_pandas_labels: + new_data_column_pandas_labels.append( + (label, margins_name) + + tuple("" for _ in range(pivot_qc.columns.nlevels - 2)) + ) + margins_frame = InternalFrame.create( + ordered_dataframe=margins_frame.ordered_dataframe, + data_column_pandas_labels=new_data_column_pandas_labels, + data_column_pandas_index_names=pivot_qc._modin_frame.data_column_pandas_index_names, + data_column_snowflake_quoted_identifiers=margins_frame.data_column_snowflake_quoted_identifiers, + index_column_pandas_labels=margins_frame.index_column_pandas_labels, + index_column_snowflake_quoted_identifiers=margins_frame.index_column_snowflake_quoted_identifiers, + ) + margins_qc = SnowflakeQueryCompiler(margins_frame) + original_pivot_qc_columns = pivot_qc.columns + pivot_qc = pivot_qc.concat(1, [margins_qc]) + # After this step, pivot_qc contains the pivotted columns followed by the margins columns - e.g. say our pivot result is + # B on.e tw"o + # D 28 27 + # E 35 31 + # Then our pivotted query_compiler now looks like this: + # B on.e tw"o margin_for_on.e margin_for_tw"o + # D 28 27 28 27 + # E 35 31 35 31 + # We have to reindex (and rename, since we used pivot, the columns will be named the same) so that we get it in the format: + # B on.e margin_for_on.e tw"o margin_for_tw"o + # D 28 28 27 27 + # E 35 35 31 31 + # If there are more than one pivot columns, then the stride will be greater - e.g. if our pivot result looks like this: + # B on.e tw"o + # C dull shi'ny dull shi'ny + # D 5 23 10 17 + # E 8 27 12 19 + # Our pivotted query_compiler will look like this: + # B on.e tw"o on.e tw"o + # C dull shi'ny dull shi'ny All All + # D 5 23 10 17 28 27 + # E 8 27 12 19 35 21 + # And so our re-indexer will look different. + if len(columns) == 1: + # Assuming we have 4 columns after the pivot, we want our reindexer to look like this: [0, 4, 1, 5, 2, 6, 3, 7]. We can accomplish this + # by zipping(range(0, 4), (4, 8)), which gives us [(0, 4), (1, 5), (2, 6), (3, 7)], and then flattening that list using sum(list, tuple()) + # which will result in our flattened indexer [0, 4, 1, 5, 2, 6, 3, 7]. + column_reindexer = list( + sum( + zip( + range(0, len(original_pivot_qc_columns)), + range( + len(original_pivot_qc_columns), + 2 * len(original_pivot_qc_columns), + ), + ), + tuple(), + ) + ) + else: + # When there is more than one pivot column, we need to reindex differently, as the example above shows. Say we have have 2 unique values in + # the first pivot column, and 2 unique values in the second pivot column (as above). Then, our final reindexer should look like this: + # [0, 1, 4, 2, 3, 5]. We can determine how many columns correspond to each first pivot column value by looking at the column MultiIndex for + # the pivotted QC. We can convert that to a frame using the `to_frame` MultiIndex API. Let's take a look at an example. + # Assuming that the MultiIndex (after converting to a frame) looks like this (i.e. there are 2 distinct values for the first pivot column, + # and 3 for the second): + # B C + # 0 on.e dull + # 1 on.e shi'ny + # 2 on.e sy + # 3 tw"o dull + # 4 tw"o shi'ny + mi_as_frame = original_pivot_qc_columns.to_frame(index=False) + # We can then groupby the first pivot column, and call count, which will tell us how many columns correspond to each label from the first pivot column. + # C + # B + # on.e 3 + # tw"o 2 + pivot_multiindex_level_one_lengths = ( + mi_as_frame.groupby(mi_as_frame.columns[0]) + .count()[mi_as_frame.columns[1]] + .values[:-1] + ) + # We can grab the first column from this groupby (in case there are more than 2 pivot columns), and use these splits with np.split, which will tell us + # the groupings of the columns. E.g., in this case, we would want the following splits for the indexes: [(0, 1, 2), (3, 4)]. Calling np.split with + # the values from above (excluding the last value) will result in that output. We call tuple on the splits to get them in tuple format. + split_original_pivot_qc_indexes = [ + list(group) + for group in np.split( + range(len(original_pivot_qc_columns)), + pivot_multiindex_level_one_lengths, + ) + ] + # Once we have the splits [[0, 1, 2], [3, 4]], we can then insert the indices for the margins columns. + reindexer = [ + group + [margin_index] + for group, margin_index in zip( + split_original_pivot_qc_indexes, + range( + len(original_pivot_qc_columns), len(pivot_qc.columns) + ), + ) + ] + # Now, we have a list that looks like this: [[0, 1, 2, 5], [3, 4, 6]] - we need to make this into a flat list of indexes. + column_reindexer = sum(reindexer, list()) + pivot_qc = pivot_qc.take_2d_positional(slice(None), column_reindexer) + + if len(columns) == 1: + # After reindexing, we have to rename the margins columns to the correct name if we only have one pivot column. + if original_pivot_qc_columns.nlevels == 1: + pivot_qc = pivot_qc.set_columns( + pd.Index( + list( + sum( + zip( + original_pivot_qc_columns, + [margins_name] + * len(original_pivot_qc_columns), + ), + tuple(), + ) + ) + ).set_names(names) + ) + else: + # If there are multiple levels in the index even though there is a single pivot column, we need to copy over the prefixes as well. + new_index_names = [] + for label in original_pivot_qc_columns: + new_index_names.extend( + [label, label[:-1] + (margins_name,)] + ) + new_index = pd.MultiIndex.from_tuples( + new_index_names + ).set_names(names) + pivot_qc = pivot_qc.set_columns(new_index) # Rename the data column snowflake quoted identifiers to be closer to pandas labels given we # may have done unwrapping of surrounding quotes, ie. so will unwrap single quotes in snowflake identifiers. @@ -9424,7 +9576,7 @@ def count_freqs( Helper function to compute the mode ("top") and frequency with which the mode appears ("count") for a given column. - This helper returns a 1-row OrderedFrame with the columns "__index__", "top" and "freq", + This helper returns a 1-row OrderedDataFrame with the columns "__index__", "top" and "freq", containing the column name, the mode of this column, and the number of times the mode occurs. This result should be UNION ALL'd together with the results from the other columns of the original frame, then transposed so "top" and "freq" are rows. @@ -9447,7 +9599,7 @@ def count_freqs( assert len(col_labels_tuple) == len( new_index_identifiers ), f"level of labels {col_labels_tuple} did not match level of identifiers {new_index_identifiers}" - # The below OrderedFrame operations are analogous to the following SQL for column "a": + # The below OrderedDataFrame operations are analogous to the following SQL for column "a": # SELECT 'a' AS __index__, # a::VARIANT AS top, # IFF(a IS NULL, NULL, COUNT(a)) AS freq diff --git a/tests/integ/modin/pivot/test_pivot_margins.py b/tests/integ/modin/pivot/test_pivot_margins.py index 354f1377c07..cad95ed5b9b 100644 --- a/tests/integ/modin/pivot/test_pivot_margins.py +++ b/tests/integ/modin/pivot/test_pivot_margins.py @@ -184,3 +184,56 @@ def test_pivot_table_unsupported_dropna_with_expanded_aggregation_margins_unsupp aggfunc={"E": ["min"], "F": "max"}, margins=True, ) + + +@pytest.mark.parametrize( + "columns", [["B"], ["B", "C"]], ids=["single_column", "multiple_columns"] +) +class TestPivotTableMarginsNoIndex: + def test_single_value_single_aggfunc(self, columns, df_data): + pivot_table_test_helper( + df_data, + { + "columns": columns, + "values": ["D"], + "aggfunc": "sum", + "dropna": True, + "margins": True, + }, + ) + + def test_multiple_value_single_aggfunc(self, columns, df_data): + pivot_table_test_helper( + df_data, + { + "columns": columns, + "values": ["D", "E"], + "aggfunc": "sum", + "dropna": True, + "margins": True, + }, + ) + + def test_single_value_multiple_aggfunc(self, columns, df_data): + pivot_table_test_helper( + df_data, + { + "columns": columns, + "values": ["D"], + "aggfunc": ["sum", "min"], + "dropna": True, + "margins": True, + }, + ) + + def test_multiple_value_multiple_aggfunc(self, columns, df_data): + pivot_table_test_helper( + df_data, + { + "columns": columns, + "values": ["D", "E"], + "aggfunc": ["sum", "min"], + "dropna": True, + "margins": True, + }, + ) From baebd773a10e9b445ba1b93ae381840a383616e3 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Wed, 15 May 2024 19:19:01 -0700 Subject: [PATCH 09/18] Remove commented code --- .../modin/plugin/_internal/pivot_utils.py | 31 ------------------- 1 file changed, 31 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py index 82bfcbade5c..f2873eb4f29 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py @@ -1236,10 +1236,6 @@ def expand_pivot_result_with_pivot_table_margins( # 14 9 margin_columns_aggregations = [] - # breakpoint() - # # When there are no `groupby_snowflake_quoted_identifiers`, the values column is not in the prefix labels, and is instead - # # in the index column. This codepath expects that the values columns are included in the prefixes of the data column pandas labels. - # if len(groupby_snowflake_quoted_identifiers) > 0: # Step 1) Generate mapping of prefix to data columns aligned with each grouping. In this example would generate: # (count, D) -> [(count, D, foo, red), (count, D, bar, blue)] # (sum, E) -> [(sum, E, foo, red), (sum, E, bar, blue)] @@ -1379,33 +1375,6 @@ def expand_pivot_result_with_pivot_table_margins( updated_data_column_snowflake_quoted_identifiers.append( margin_column_aggr_snowflake_quoted_identifier ) - # # Step 1: When there are no groupby columns, the data column pandas label's format changes depending on how - # # many pivot columns there are. For a single pivot column, the resulting DataFrame has labels with only 1 level; - # # but when there are multiple pivot columns, the margin column takes the first pivot column's values as a prefix. - # # For each subsequent pivot column, an additional empty post-fix is added. - # if len(pivot_snowflake_quoted_identifiers) == 1: - # # If there is only a single pivot column, then we just add 1 column with the name of the margin column per pivot - # # value, which should be equal to the number of columns in the pivoted dataframe. - # new_data_column_pandas_labels = [margins_name] * len(pivoted_qc.columns) - # else: - # new_data_column_pandas_labels = [] - # num_levels_to_pad = pivoted_qc.index.nlevels - 2 - # for prefix in pivoted_qc.index.get_level_values(0).unique(): - # new_data_column_pandas_labels.append((prefix, margins_name) + tuple('' for _ in range(num_levels_to_pad))) - # values_snowflake_quoted_identifiers = {pair.aggr_label_identifier_pair.snowflake_quoted_identifier for pair in pivot_aggr_groupings} - # value_to_aggr_func = {v: [pair.aggfunc for pair in filter(lambda pair: pair.aggr_label_identifier_pair.snowflake_quoted_identifier == v, pivot_aggr_groupings)] for v in values_snowflake_quoted_identifiers} - # for value_snowflake_quoted_identifier, aggfunc in values_snowflake_quoted_identifiers: - # margin_columns_aggregations.append( - # apply_fill_value_to_snowpark_column( - # get_margin_aggregation( - # aggfunc, - # col(value_snowflake_quoted_identifier) - # ), - # fill_value, - # ).as_(original_ordered_dataframe) - # ) - - # pass # Step 3) # To generate the margin column aggregations we need to group by the groupby_snowflake_quoted_identifiers and join From b5659743f9fdf3485a882c75ce172d92e52ece5b Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Wed, 15 May 2024 19:48:45 -0700 Subject: [PATCH 10/18] Add support for margins fully --- .../compiler/snowflake_query_compiler.py | 25 +++++++++++++------ tests/integ/modin/pivot/test_pivot_margins.py | 4 +++ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index aeb30b2739c..a906fc2d459 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -6473,10 +6473,13 @@ def pivot_table( # pandas labels, as well as any empty postfixes for the remaining pivot columns if there are more than 2. new_data_column_pandas_labels = [] for label in margins_frame.data_column_pandas_labels: - new_data_column_pandas_labels.append( - (label, margins_name) - + tuple("" for _ in range(pivot_qc.columns.nlevels - 2)) - ) + if isinstance(aggfunc, list) and len(aggfunc) > 1: + new_label = label + (margins_name,) + else: + new_label = (label, margins_name) + tuple( + "" for _ in range(pivot_qc.columns.nlevels - 2) + ) + new_data_column_pandas_labels.append(new_label) margins_frame = InternalFrame.create( ordered_dataframe=margins_frame.ordered_dataframe, data_column_pandas_labels=new_data_column_pandas_labels, @@ -6546,9 +6549,17 @@ def pivot_table( # B # on.e 3 # tw"o 2 - pivot_multiindex_level_one_lengths = ( - mi_as_frame.groupby(mi_as_frame.columns[0]) - .count()[mi_as_frame.columns[1]] + # If there are multiple columns and multiple aggregation functions, we need to groupby the first two columns instead of just the first one - + # as the first column will be the name of the aggregation function, and the second column will be the values from the first pivot column. + if isinstance(aggfunc, list) and len(aggfunc) > 1: + groupby_columns = mi_as_frame.columns[:2].tolist() + value_column_index = 2 + else: + groupby_columns = mi_as_frame.columns[0] + value_column_index = 1 + pivot_multiindex_level_one_lengths = np.cumsum( + mi_as_frame.groupby(groupby_columns, sort=False) + .count()[mi_as_frame.columns[value_column_index]] .values[:-1] ) # We can grab the first column from this groupby (in case there are more than 2 pivot columns), and use these splits with np.split, which will tell us diff --git a/tests/integ/modin/pivot/test_pivot_margins.py b/tests/integ/modin/pivot/test_pivot_margins.py index cad95ed5b9b..aaae20a5c09 100644 --- a/tests/integ/modin/pivot/test_pivot_margins.py +++ b/tests/integ/modin/pivot/test_pivot_margins.py @@ -190,6 +190,7 @@ def test_pivot_table_unsupported_dropna_with_expanded_aggregation_margins_unsupp "columns", [["B"], ["B", "C"]], ids=["single_column", "multiple_columns"] ) class TestPivotTableMarginsNoIndex: + @sql_count_checker(query_count=1, join_count=1) def test_single_value_single_aggfunc(self, columns, df_data): pivot_table_test_helper( df_data, @@ -202,6 +203,7 @@ def test_single_value_single_aggfunc(self, columns, df_data): }, ) + @sql_count_checker(query_count=1, join_count=1, union_count=2) def test_multiple_value_single_aggfunc(self, columns, df_data): pivot_table_test_helper( df_data, @@ -214,6 +216,7 @@ def test_multiple_value_single_aggfunc(self, columns, df_data): }, ) + @sql_count_checker(query_count=1, join_count=3) def test_single_value_multiple_aggfunc(self, columns, df_data): pivot_table_test_helper( df_data, @@ -226,6 +229,7 @@ def test_single_value_multiple_aggfunc(self, columns, df_data): }, ) + @sql_count_checker(query_count=1, join_count=5, union_count=2) def test_multiple_value_multiple_aggfunc(self, columns, df_data): pivot_table_test_helper( df_data, From 814f1e66d81d90573d994e4bd335786c7e6cf5da Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Wed, 15 May 2024 20:07:17 -0700 Subject: [PATCH 11/18] Add more rigorous testing --- tests/integ/modin/pivot/conftest.py | 63 +++++++++++++++++++ tests/integ/modin/pivot/test_pivot_margins.py | 59 ++++++++++++++++- 2 files changed, 121 insertions(+), 1 deletion(-) diff --git a/tests/integ/modin/pivot/conftest.py b/tests/integ/modin/pivot/conftest.py index d099d389130..17e350c64a9 100644 --- a/tests/integ/modin/pivot/conftest.py +++ b/tests/integ/modin/pivot/conftest.py @@ -53,6 +53,69 @@ def df_data(): } +@pytest.fixture(scope="module") +def df_data_more_pivot_values(): + return { + "A": [ + "foo", + "foo", + "foo", + "foo", + "bar", + "bar", + "bar", + "bar", + "foo", + "foo", + "foo", + "bar", + "bar", + "foo", + "foo", + "foo", + ], + "B": [ + "on.e", + "on.e", + "on.e", + 'tw"o', + "on.e", + "on.e", + "on.e", + 'tw"o', + 'tw"o', + 'tw"o', + "on.e", + "thr.ee", + "thr.ee", + "thr.ee", + "on.e", + 'tw"o', + ], + "C": [ + "dull", + "dull", + "shi'ny", + "dull", + "dull", + "shi'ny", + "shi'ny", + "dull", + "shi'ny", + "shi'ny", + "shi'ny", + "dull", + "shi'ny", + "pla.in", + "pla.in", + "pla.in", + ], + "D": np.arange(0, 16), + "E": np.arange(1, 17), + "F": np.arange(2, 18), + } + + @pytest.fixture(scope="module") def df_data_with_duplicates(): return ( diff --git a/tests/integ/modin/pivot/test_pivot_margins.py b/tests/integ/modin/pivot/test_pivot_margins.py index aaae20a5c09..e5cd839dd67 100644 --- a/tests/integ/modin/pivot/test_pivot_margins.py +++ b/tests/integ/modin/pivot/test_pivot_margins.py @@ -189,7 +189,7 @@ def test_pivot_table_unsupported_dropna_with_expanded_aggregation_margins_unsupp @pytest.mark.parametrize( "columns", [["B"], ["B", "C"]], ids=["single_column", "multiple_columns"] ) -class TestPivotTableMarginsNoIndex: +class TestPivotTableMarginsNoIndexFewerPivotValues: @sql_count_checker(query_count=1, join_count=1) def test_single_value_single_aggfunc(self, columns, df_data): pivot_table_test_helper( @@ -241,3 +241,60 @@ def test_multiple_value_multiple_aggfunc(self, columns, df_data): "margins": True, }, ) + + +@pytest.mark.parametrize( + "columns", [["B"], ["B", "C"]], ids=["single_column", "multiple_columns"] +) +class TestPivotTableMarginsNoIndexMorePivotValues: + @sql_count_checker(query_count=1, join_count=1) + def test_single_value_single_aggfunc(self, columns, df_data_more_pivot_values): + pivot_table_test_helper( + df_data_more_pivot_values, + { + "columns": columns, + "values": ["D"], + "aggfunc": "sum", + "dropna": True, + "margins": True, + }, + ) + + @sql_count_checker(query_count=1, join_count=1, union_count=2) + def test_multiple_value_single_aggfunc(self, columns, df_data_more_pivot_values): + pivot_table_test_helper( + df_data_more_pivot_values, + { + "columns": columns, + "values": ["D", "E"], + "aggfunc": "sum", + "dropna": True, + "margins": True, + }, + ) + + @sql_count_checker(query_count=1, join_count=3) + def test_single_value_multiple_aggfunc(self, columns, df_data_more_pivot_values): + pivot_table_test_helper( + df_data_more_pivot_values, + { + "columns": columns, + "values": ["D"], + "aggfunc": ["sum", "min"], + "dropna": True, + "margins": True, + }, + ) + + @sql_count_checker(query_count=1, join_count=5, union_count=2) + def test_multiple_value_multiple_aggfunc(self, columns, df_data_more_pivot_values): + pivot_table_test_helper( + df_data_more_pivot_values, + { + "columns": columns, + "values": ["D", "E"], + "aggfunc": ["sum", "min"], + "dropna": True, + "margins": True, + }, + ) From f57fd108a09d44e46340eadfea51c72555e002cb Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Wed, 15 May 2024 20:12:37 -0700 Subject: [PATCH 12/18] Address review comments --- src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py | 2 +- src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py index f2873eb4f29..754b48f8e22 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py @@ -78,7 +78,7 @@ def perform_pivot_and_concatenate( should_join_along_columns: bool, ) -> PivotedOrderedDataFrameResult: """ - Helper functio to perform a full pivot (including joining in the case of multiple aggrs or values) on an OrderedDataFrame. + Helper function to perform a full pivot (including joining in the case of multiple aggrs or values) on an OrderedDataFrame. Args: ordered_dataframe: The ordered dataframe to perform pivot on. diff --git a/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py b/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py index 92573d33729..c89c67cece6 100644 --- a/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py +++ b/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py @@ -1994,8 +1994,8 @@ def pivot_table(): ----- Raise NotImplementedError if - * margins, observed, or sort is given; - * or index, columns, or values is not str; + * observed or sort is given; + * or index, columns, or values is not str, a list of str, or None; * or DataFrame contains MultiIndex; * or any argfunc is not "count", "mean", "min", "max", or "sum" From 4282c905d950e1692b27910b30007f2b317a3b70 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Thu, 16 May 2024 12:12:09 -0700 Subject: [PATCH 13/18] Address review comments --- .../modin/supported/dataframe_supported.rst | 7 +++-- .../snowpark/modin/plugin/PANDAS_CHANGELOG.md | 2 +- .../compiler/snowflake_query_compiler.py | 20 +++++++++---- tests/integ/modin/pivot/test_pivot_margins.py | 29 +++++++++++++++++-- .../integ/modin/pivot/test_pivot_multiple.py | 29 +++++++++++++------ tests/integ/modin/pivot/test_pivot_single.py | 4 +-- 6 files changed, 68 insertions(+), 23 deletions(-) diff --git a/docs/source/modin/supported/dataframe_supported.rst b/docs/source/modin/supported/dataframe_supported.rst index a613616f23c..c550a10df94 100644 --- a/docs/source/modin/supported/dataframe_supported.rst +++ b/docs/source/modin/supported/dataframe_supported.rst @@ -297,9 +297,10 @@ Methods +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``pivot`` | N | | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ -| ``pivot_table`` | P | ``observed``, ``margins``, | ``N`` if ``index``, ``columns``, or ``values`` is | -| | | ``sort`` | not str; or MultiIndex; or any ``argfunc`` is not | -| | | | "count", "mean", "min", "max", or "sum" | +| ``pivot_table`` | P | ``observed``, ``sort`` | ``N`` if ``index``, ``columns``, or ``values`` is | +| | | | not str, list of str, or None; or MultiIndex; or | +| | | | any ``argfunc`` is not "count", "mean", "min", | +| | | | "max", or "sum" | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``pop`` | N | | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ diff --git a/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md b/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md index 56093cb89a5..6ac89e62234 100644 --- a/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md +++ b/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md @@ -23,7 +23,7 @@ - Added partial support for `SeriesGroupBy.apply` (where the `SeriesGrouBy` is obtained through `DataFrameGroupBy.__getitem__`). - Added support for `pd.NamedAgg` in `DataFrameGroupBy.agg` and `SeriesGroupBy.agg`. - Added support for `Series.str.slice`. -- Added support for `DataFrame.pivot_table` with no `index` parameter. +- Added support for `DataFrame.pivot_table` with no `index` parameter, as well as for `margins` parameter. ## 1.15.0a1 (2024-05-03) diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index a906fc2d459..7906e077286 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -6398,7 +6398,7 @@ def pivot_table( ) multiple_agg_funcs_single_values = ( isinstance(aggfunc, list) and len(aggfunc) > 1 - ) and not (isinstance(values, list) and len(values) > 1) + ) and not isinstance(values, list) include_aggr_func_in_label = ( len(groupby_snowflake_quoted_identifiers) != 0 or multiple_agg_funcs_single_values @@ -6409,7 +6409,7 @@ def pivot_table( aggfunc, len(pivot_snowflake_quoted_identifiers) > 0, isinstance(values, list) - and len(values) > 1 + and (not margins or len(values) > 1) and include_aggr_func_in_label, sort, ) @@ -6443,7 +6443,15 @@ def pivot_table( # Add margins if specified, note this will also add the row position since the margin row needs to be fixed # as the last row of the dataframe. If no margins, then we order by the group by columns. - if margins and pivot_aggr_groupings and pivot_snowflake_quoted_identifiers: + # The final condition checks to see if there are any columns in the pivot result. If there are no columns, + # this means that we pivoted on an empty table - in that case, we can skip adding margins, since the result + # will still be an empty DataFrame (but we will have increased the join and union count) for no reason. + if ( + margins + and pivot_aggr_groupings + and pivot_snowflake_quoted_identifiers + and len(pivot_qc.columns) != 0 + ): if len(groupby_snowflake_quoted_identifiers) > 0: pivot_qc = expand_pivot_result_with_pivot_table_margins( pivot_aggr_groupings, @@ -6469,11 +6477,11 @@ def pivot_table( index, ) if len(columns) > 1: - # If there are multiple pivot columns, we need to add the margin_name to the margins frame's data column + # If there is a multiindex on the pivot result, we need to add the margin_name to the margins frame's data column # pandas labels, as well as any empty postfixes for the remaining pivot columns if there are more than 2. new_data_column_pandas_labels = [] for label in margins_frame.data_column_pandas_labels: - if isinstance(aggfunc, list) and len(aggfunc) > 1: + if isinstance(aggfunc, list): new_label = label + (margins_name,) else: new_label = (label, margins_name) + tuple( @@ -6551,7 +6559,7 @@ def pivot_table( # tw"o 2 # If there are multiple columns and multiple aggregation functions, we need to groupby the first two columns instead of just the first one - # as the first column will be the name of the aggregation function, and the second column will be the values from the first pivot column. - if isinstance(aggfunc, list) and len(aggfunc) > 1: + if isinstance(aggfunc, list): groupby_columns = mi_as_frame.columns[:2].tolist() value_column_index = 2 else: diff --git a/tests/integ/modin/pivot/test_pivot_margins.py b/tests/integ/modin/pivot/test_pivot_margins.py index e5cd839dd67..e6ab6e5016a 100644 --- a/tests/integ/modin/pivot/test_pivot_margins.py +++ b/tests/integ/modin/pivot/test_pivot_margins.py @@ -210,7 +210,7 @@ def test_multiple_value_single_aggfunc(self, columns, df_data): { "columns": columns, "values": ["D", "E"], - "aggfunc": "sum", + "aggfunc": ["sum"], "dropna": True, "margins": True, }, @@ -243,6 +243,31 @@ def test_multiple_value_multiple_aggfunc(self, columns, df_data): ) +@sql_count_checker(query_count=1) +def test_pivot_table_empty_table_with_index_margins(): + # Cannot use pivot_table_test_helper since that checks the inferred types + # on the resulting DataFrames' columns (which are empty), and the inferred type + # on our DataFrame's columns is empty, while pandas has type floating. + import pandas as native_pd + + native_df = native_pd.DataFrame({"A": [], "B": [], "C": [], "D": []}) + snow_df = pd.DataFrame(native_df) + pivot_kwargs = { + "index": ["A", "B"], + "columns": "C", + "values": "D", + "aggfunc": "count", + "margins": True, + } + + snow_result = snow_df.pivot_table(**pivot_kwargs).to_pandas() + native_result = native_df.pivot_table(**pivot_kwargs) + + assert native_result.empty == snow_result.empty and (native_result.empty is True) + assert list(native_result.columns) == list(snow_result.columns) + assert list(native_result.index) == list(snow_result.index) + + @pytest.mark.parametrize( "columns", [["B"], ["B", "C"]], ids=["single_column", "multiple_columns"] ) @@ -254,7 +279,7 @@ def test_single_value_single_aggfunc(self, columns, df_data_more_pivot_values): { "columns": columns, "values": ["D"], - "aggfunc": "sum", + "aggfunc": ["sum"], "dropna": True, "margins": True, }, diff --git a/tests/integ/modin/pivot/test_pivot_multiple.py b/tests/integ/modin/pivot/test_pivot_multiple.py index f092c9c804c..5b9360f14a3 100644 --- a/tests/integ/modin/pivot/test_pivot_multiple.py +++ b/tests/integ/modin/pivot/test_pivot_multiple.py @@ -60,40 +60,47 @@ def test_pivot_table_no_index_single_column_single_values_multiple_aggr_func(df_ ) -@pytest.mark.parametrize("aggfunc", ["count", "sum", "min", "max", "mean"]) +@pytest.mark.parametrize("aggfunc", ["count", "sum", "min", "max", "mean", ["count"]]) +@pytest.mark.parametrize("values", ["D", ["D"]]) @sql_count_checker(query_count=1) -def test_pivot_table_single_index_multiple_column_single_value(df_data, aggfunc): +def test_pivot_table_single_index_multiple_column_single_value( + df_data, aggfunc, values +): pivot_table_test_helper( df_data, { "index": "A", "columns": ["B", "C"], - "values": "D", + "values": values, "aggfunc": aggfunc, }, ) @pytest.mark.parametrize("aggfunc", ["count", "sum", "min", "max", "mean"]) +@pytest.mark.parametrize("values", ["D", ["D"]]) @sql_count_checker(query_count=1) -def test_pivot_table_no_index_multiple_column_single_value(df_data, aggfunc): +def test_pivot_table_no_index_multiple_column_single_value(df_data, aggfunc, values): pivot_table_test_helper( df_data, { "columns": ["B", "C"], - "values": "D", + "values": values, "aggfunc": aggfunc, }, ) +@pytest.mark.parametrize("values", ["D", ["D"]]) @sql_count_checker(query_count=1, join_count=1) -def test_pivot_table_no_index_multiple_column_single_value_multiple_aggr_func(df_data): +def test_pivot_table_no_index_multiple_column_single_value_multiple_aggr_func( + df_data, values +): pivot_table_test_helper( df_data, { "columns": ["B", "C"], - "values": "D", + "values": values, "aggfunc": ["mean", "max"], }, ) @@ -146,25 +153,29 @@ def test_pivot_table_single_index_single_column_multiple_encoded_values_with_sor ) +@pytest.mark.parametrize("aggfunc", ["count", ["count"]]) @sql_count_checker(query_count=1, join_count=1) -def test_pivot_table_single_index_multiple_columns_multiple_values(df_data): +def test_pivot_table_single_index_multiple_columns_multiple_values(df_data, aggfunc): pivot_table_test_helper( df_data, { "index": "A", "columns": ["B", "C"], "values": ["D", "E"], + "aggfunc": aggfunc, }, ) +@pytest.mark.parametrize("aggfunc", ["count", ["count"]]) @sql_count_checker(query_count=1, union_count=1) -def test_pivot_table_no_index_multiple_columns_multiple_values(df_data): +def test_pivot_table_no_index_multiple_columns_multiple_values(df_data, aggfunc): pivot_table_test_helper( df_data, { "columns": ["B", "C"], "values": ["D", "E"], + "aggfunc": aggfunc, }, ) diff --git a/tests/integ/modin/pivot/test_pivot_single.py b/tests/integ/modin/pivot/test_pivot_single.py index 580270ebdfe..10487d47930 100644 --- a/tests/integ/modin/pivot/test_pivot_single.py +++ b/tests/integ/modin/pivot/test_pivot_single.py @@ -243,8 +243,8 @@ def test_pivot_on_inline_data_using_temp_table(): assert row_count == 25 -@pytest.mark.xfail(strict=True, raises=SnowparkSQLException, reason="SNOW-1233895") -def test_pivot_empty_frame_snow_1233895(): +@pytest.mark.xfail(strict=True, raises=SnowparkSQLException, reason="SNOW-1013918") +def test_pivot_empty_frame_snow_1013918(): eval_snowpark_pandas_result( *create_test_dfs(columns=["a", "b", "c"]), lambda df: df.pivot_table(index="a", columns="b") From 8767ee97074e438ee266eab4e79ec3b4a7413e4b Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Thu, 16 May 2024 12:16:09 -0700 Subject: [PATCH 14/18] Unskip supported tests, add strict=True for xfailing test --- tests/integ/modin/pivot/test_pivot_multiple.py | 2 +- tests/integ/modin/pivot/test_pivot_single.py | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/integ/modin/pivot/test_pivot_multiple.py b/tests/integ/modin/pivot/test_pivot_multiple.py index 5b9360f14a3..c457a089d56 100644 --- a/tests/integ/modin/pivot/test_pivot_multiple.py +++ b/tests/integ/modin/pivot/test_pivot_multiple.py @@ -299,7 +299,7 @@ def update_columns_inline(df): # TODO (SNOW-854301): Needs support for MultiIndex.levels, fails because result.columns.levels[N] don't equal # We use xfail to run so we can help code coverage -@pytest.mark.xfail +@pytest.mark.xfail(strict=True) @pytest.mark.parametrize("values", [None, []]) @sql_count_checker(query_count=0) def test_pivot_table_no_values_by_default(df_data, values): diff --git a/tests/integ/modin/pivot/test_pivot_single.py b/tests/integ/modin/pivot/test_pivot_single.py index 10487d47930..4887b2e9a98 100644 --- a/tests/integ/modin/pivot/test_pivot_single.py +++ b/tests/integ/modin/pivot/test_pivot_single.py @@ -17,9 +17,6 @@ from tests.integ.modin.utils import create_test_dfs, eval_snowpark_pandas_result -@pytest.mark.skip( - "SNOW-959913: Support no index configuration with columns and margins configuration" -) @sql_count_checker(query_count=1) def test_pivot_table_no_index_single_column_single_value(df_data): pivot_table_test_helper( From 72fe4311e220a6a950ad2d5b5521edfe8853fe40 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Fri, 17 May 2024 15:08:19 -0700 Subject: [PATCH 15/18] Add some more tests --- .../modin/supported/dataframe_supported.rst | 6 +- .../snowpark/modin/pandas/general.py | 16 +- .../modin/plugin/_internal/pivot_utils.py | 171 ++++++++++++++++ .../compiler/snowflake_query_compiler.py | 190 +++--------------- .../modin/plugin/docstrings/dataframe.py | 12 +- tests/integ/modin/pivot/test_pivot_margins.py | 97 +++++++-- .../integ/modin/pivot/test_pivot_multiple.py | 33 +++ 7 files changed, 334 insertions(+), 191 deletions(-) diff --git a/docs/source/modin/supported/dataframe_supported.rst b/docs/source/modin/supported/dataframe_supported.rst index c550a10df94..bf752172221 100644 --- a/docs/source/modin/supported/dataframe_supported.rst +++ b/docs/source/modin/supported/dataframe_supported.rst @@ -300,7 +300,11 @@ Methods | ``pivot_table`` | P | ``observed``, ``sort`` | ``N`` if ``index``, ``columns``, or ``values`` is | | | | | not str, list of str, or None; or MultiIndex; or | | | | | any ``argfunc`` is not "count", "mean", "min", | -| | | | "max", or "sum" | +| | | | "max", or "sum". N if ``index`` is None, | +| | | | ``margins`` is True and ``aggfunc`` is "count" | +| | | | or "mean" or a dictionary. N if ``index`` is None | +| | | | and ``aggfunc`` is a dictionary containing | +| | | | lists of aggfuncs to apply. | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``pop`` | N | | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ diff --git a/src/snowflake/snowpark/modin/pandas/general.py b/src/snowflake/snowpark/modin/pandas/general.py index 3634b1bee3c..df590e6c99a 100644 --- a/src/snowflake/snowpark/modin/pandas/general.py +++ b/src/snowflake/snowpark/modin/pandas/general.py @@ -533,12 +533,20 @@ def pivot_table( Notes ----- - Raise NotImplementedError if + - Raise NotImplementedError if - * margins, observed, or sort is given; - * or index, columns, or values is not str; + * observed, or sort is given; + * or index, columns, or values is not str, a list of str, or None; * or DataFrame contains MultiIndex; - * or any argfunc is not "count", "mean", "min", "max", or "sum" + * or any aggfunc is not "count", "mean", "min", "max", or "sum" + * index is None, and aggfunc is a dictionary containing lists. + + - `margins`=True with no index has limited support: + * when aggfunc is "count" or "mean" the result has discrepancies with pandas - + Snowpark pandas computes the aggfunc over the data grouped by the first pivot + column, while pandas computes the aggfunc over the result of the aggfunc from + the initial pivot. + * aggfunc as a dictionary is not supported. See Also -------- diff --git a/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py index 754b48f8e22..f4ba89b160a 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py @@ -7,6 +7,8 @@ from itertools import product from typing import Any, Callable, NamedTuple, Optional, Union +import numpy as np +import pandas as pd from pandas._typing import AggFuncType, AggFuncTypeBase, Scalar from snowflake.snowpark.column import Column as SnowparkColumn @@ -1132,6 +1134,175 @@ def get_margin_aggregation( return aggfunc_expr +def expand_pivot_result_with_pivot_table_margins_no_groupby_columns( + pivot_qc: "SnowflakeQueryCompiler", # type: ignore[name-defined] # noqa: F821 + original_modin_frame: InternalFrame, + pivot_aggr_groupings: list[PivotAggrGrouping], + dropna: bool, + columns: list[str], + aggfunc: AggFuncType, + pivot_snowflake_quoted_identifiers: list[str], + values: list[str], + margins_name: str, +) -> "SnowflakeQueryCompiler": # type: ignore[name-defined] # noqa: F821 + names = pivot_qc.columns.names + margins_frame = pivot_helper( + original_modin_frame, + pivot_aggr_groupings, + not dropna, + not isinstance(aggfunc, list), + columns[:1], + [], # There are no groupby_snowflake_quoted_identifiers + pivot_snowflake_quoted_identifiers[:1], + (isinstance(aggfunc, list) and len(aggfunc) > 1), + (isinstance(values, list) and len(values) > 1), + None, # There is no index. + ) + if len(columns) > 1: + # If there is a multiindex on the pivot result, we need to add the margin_name to the margins frame's data column + # pandas labels, as well as any empty postfixes for the remaining pivot columns if there are more than 2. + new_data_column_pandas_labels = [] + for label in margins_frame.data_column_pandas_labels: + if isinstance(aggfunc, list): + new_label = label + (margins_name,) + else: + new_label = (label, margins_name) + tuple( + "" for _ in range(pivot_qc.columns.nlevels - 2) + ) + new_data_column_pandas_labels.append(new_label) + margins_frame = InternalFrame.create( + ordered_dataframe=margins_frame.ordered_dataframe, + data_column_pandas_labels=new_data_column_pandas_labels, + data_column_pandas_index_names=pivot_qc._modin_frame.data_column_pandas_index_names, + data_column_snowflake_quoted_identifiers=margins_frame.data_column_snowflake_quoted_identifiers, + index_column_pandas_labels=margins_frame.index_column_pandas_labels, + index_column_snowflake_quoted_identifiers=margins_frame.index_column_snowflake_quoted_identifiers, + ) + + # Need to create a QueryCompiler for the margins frame, but SnowflakeQueryCompiler is not present in this scope + # so we use this workaround instead. + margins_qc = type(pivot_qc)(margins_frame) + original_pivot_qc_columns = pivot_qc.columns + pivot_qc = pivot_qc.concat(1, [margins_qc]) + # After this step, pivot_qc contains the pivotted columns followed by the margins columns - e.g. say our pivot result is + # B on.e tw"o + # D 28 27 + # E 35 31 + # Then our pivotted query_compiler now looks like this: + # B on.e tw"o margin_for_on.e margin_for_tw"o + # D 28 27 28 27 + # E 35 31 35 31 + # We have to reindex (and rename, since we used pivot, the columns will be named the same) so that we get it in the format: + # B on.e margin_for_on.e tw"o margin_for_tw"o + # D 28 28 27 27 + # E 35 35 31 31 + # If there are more than one pivot columns, then the stride will be greater - e.g. if our pivot result looks like this: + # B on.e tw"o + # C dull shi'ny dull shi'ny + # D 5 23 10 17 + # E 8 27 12 19 + # Our pivotted query_compiler will look like this: + # B on.e tw"o on.e tw"o + # C dull shi'ny dull shi'ny All All + # D 5 23 10 17 28 27 + # E 8 27 12 19 35 21 + # And so our re-indexer will look different. + if len(columns) == 1: + # Assuming we have 4 columns after the pivot, we want our reindexer to look like this: [0, 4, 1, 5, 2, 6, 3, 7]. We can accomplish this + # by zipping(range(0, 4), (4, 8)), which gives us [(0, 4), (1, 5), (2, 6), (3, 7)], and then flattening that list using sum(list, tuple()) + # which will result in our flattened indexer [0, 4, 1, 5, 2, 6, 3, 7]. + column_reindexer = list( + sum( + zip( + range(0, len(original_pivot_qc_columns)), + range( + len(original_pivot_qc_columns), + 2 * len(original_pivot_qc_columns), + ), + ), + tuple(), + ) + ) + else: + # When there is more than one pivot column, we need to reindex differently, as the example above shows. Say we have have 2 unique values in + # the first pivot column, and 2 unique values in the second pivot column (as above). Then, our final reindexer should look like this: + # [0, 1, 4, 2, 3, 5]. We can determine how many columns correspond to each first pivot column value by looking at the column MultiIndex for + # the pivotted QC. We can convert that to a frame using the `to_frame` MultiIndex API. Let's take a look at an example. + # Assuming that the MultiIndex (after converting to a frame) looks like this (i.e. there are 2 distinct values for the first pivot column, + # and 3 for the second): + # B C + # 0 on.e dull + # 1 on.e shi'ny + # 2 on.e sy + # 3 tw"o dull + # 4 tw"o shi'ny + mi_as_frame = original_pivot_qc_columns.to_frame(index=False) + # We can then groupby the first pivot column, and call count, which will tell us how many columns correspond to each label from the first pivot column. + # C + # B + # on.e 3 + # tw"o 2 + # If there are multiple columns and multiple aggregation functions, we need to groupby the first two columns instead of just the first one - + # as the first column will be the name of the aggregation function, and the second column will be the values from the first pivot column. + if isinstance(aggfunc, list): + groupby_columns = mi_as_frame.columns[:2].tolist() + value_column_index = 2 + else: + groupby_columns = mi_as_frame.columns[0] + value_column_index = 1 + pivot_multiindex_level_one_lengths = np.cumsum( + mi_as_frame.groupby(groupby_columns, sort=False) + .count()[mi_as_frame.columns[value_column_index]] + .values[:-1] + ) + # We can grab the first column from this groupby (in case there are more than 2 pivot columns), and use these splits with np.split, which will tell us + # the groupings of the columns. E.g., in this case, we would want the following splits for the indexes: [(0, 1, 2), (3, 4)]. Calling np.split with + # the values from above (excluding the last value) will result in that output. We call tuple on the splits to get them in tuple format. + split_original_pivot_qc_indexes = [ + list(group) + for group in np.split( + range(len(original_pivot_qc_columns)), + pivot_multiindex_level_one_lengths, + ) + ] + # Once we have the splits [[0, 1, 2], [3, 4]], we can then insert the indices for the margins columns. + reindexer = [ + group + [margin_index] + for group, margin_index in zip( + split_original_pivot_qc_indexes, + range(len(original_pivot_qc_columns), len(pivot_qc.columns)), + ) + ] + # Now, we have a list that looks like this: [[0, 1, 2, 5], [3, 4, 6]] - we need to make this into a flat list of indexes. + column_reindexer = sum(reindexer, list()) + pivot_qc = pivot_qc.take_2d_positional(slice(None), column_reindexer) + + if len(columns) == 1: + # After reindexing, we have to rename the margins columns to the correct name if we only have one pivot column. + if original_pivot_qc_columns.nlevels == 1: + pivot_qc = pivot_qc.set_columns( + pd.Index( + list( + sum( + zip( + original_pivot_qc_columns, + [margins_name] * len(original_pivot_qc_columns), + ), + tuple(), + ) + ) + ).set_names(names) + ) + else: + # If there are multiple levels in the index even though there is a single pivot column, we need to copy over the prefixes as well. + new_index_names = [] + for label in original_pivot_qc_columns: + new_index_names.extend([label, label[:-1] + (margins_name,)]) + new_index = pd.MultiIndex.from_tuples(new_index_names).set_names(names) + pivot_qc = pivot_qc.set_columns(new_index) + return pivot_qc + + def expand_pivot_result_with_pivot_table_margins( pivot_aggr_groupings: list[PivotAggrGrouping], groupby_snowflake_quoted_identifiers: list[str], diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index 7906e077286..eb652c87a41 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -221,6 +221,7 @@ ) from snowflake.snowpark.modin.plugin._internal.pivot_utils import ( expand_pivot_result_with_pivot_table_margins, + expand_pivot_result_with_pivot_table_margins_no_groupby_columns, generate_pivot_aggregation_value_label_snowflake_quoted_identifier_mappings, generate_single_pivot_labels, pivot_helper, @@ -6332,14 +6333,19 @@ def pivot_table( ): raise TypeError("Must provide 'func' or named aggregation **kwargs.") - # With margins, a dictionary aggfunc that maps to list of aggregations is not supported by pandas. We return - # friendly error message in this case. - if ( - margins - and isinstance(aggfunc, dict) - and any(not isinstance(af, str) for af in aggfunc.values()) + if isinstance(aggfunc, dict) and any( + not isinstance(af, str) for af in aggfunc.values() ): - raise ValueError("Margins not supported if list of aggregation functions") + # With margins, a dictionary aggfunc that maps to list of aggregations is not supported by pandas. We return + # friendly error message in this case. + if margins: + raise ValueError( + "Margins not supported if list of aggregation functions" + ) + elif index is None: + raise NotImplementedError( + "Not implemented index is None and list of aggregation functions." + ) # Duplicate pivot column and index are not allowed, but duplicate aggregation values are supported. index_and_data_column_pandas_labels = ( @@ -6463,165 +6469,19 @@ def pivot_table( fill_value, ) else: - names = pivot_qc.columns.names - margins_frame = pivot_helper( - self._modin_frame, - pivot_aggr_groupings, - not dropna, - not isinstance(aggfunc, list), - columns[:1], - groupby_snowflake_quoted_identifiers, - pivot_snowflake_quoted_identifiers[:1], - (isinstance(aggfunc, list) and len(aggfunc) > 1), - (isinstance(values, list) and len(values) > 1), - index, - ) - if len(columns) > 1: - # If there is a multiindex on the pivot result, we need to add the margin_name to the margins frame's data column - # pandas labels, as well as any empty postfixes for the remaining pivot columns if there are more than 2. - new_data_column_pandas_labels = [] - for label in margins_frame.data_column_pandas_labels: - if isinstance(aggfunc, list): - new_label = label + (margins_name,) - else: - new_label = (label, margins_name) + tuple( - "" for _ in range(pivot_qc.columns.nlevels - 2) - ) - new_data_column_pandas_labels.append(new_label) - margins_frame = InternalFrame.create( - ordered_dataframe=margins_frame.ordered_dataframe, - data_column_pandas_labels=new_data_column_pandas_labels, - data_column_pandas_index_names=pivot_qc._modin_frame.data_column_pandas_index_names, - data_column_snowflake_quoted_identifiers=margins_frame.data_column_snowflake_quoted_identifiers, - index_column_pandas_labels=margins_frame.index_column_pandas_labels, - index_column_snowflake_quoted_identifiers=margins_frame.index_column_snowflake_quoted_identifiers, - ) - margins_qc = SnowflakeQueryCompiler(margins_frame) - original_pivot_qc_columns = pivot_qc.columns - pivot_qc = pivot_qc.concat(1, [margins_qc]) - # After this step, pivot_qc contains the pivotted columns followed by the margins columns - e.g. say our pivot result is - # B on.e tw"o - # D 28 27 - # E 35 31 - # Then our pivotted query_compiler now looks like this: - # B on.e tw"o margin_for_on.e margin_for_tw"o - # D 28 27 28 27 - # E 35 31 35 31 - # We have to reindex (and rename, since we used pivot, the columns will be named the same) so that we get it in the format: - # B on.e margin_for_on.e tw"o margin_for_tw"o - # D 28 28 27 27 - # E 35 35 31 31 - # If there are more than one pivot columns, then the stride will be greater - e.g. if our pivot result looks like this: - # B on.e tw"o - # C dull shi'ny dull shi'ny - # D 5 23 10 17 - # E 8 27 12 19 - # Our pivotted query_compiler will look like this: - # B on.e tw"o on.e tw"o - # C dull shi'ny dull shi'ny All All - # D 5 23 10 17 28 27 - # E 8 27 12 19 35 21 - # And so our re-indexer will look different. - if len(columns) == 1: - # Assuming we have 4 columns after the pivot, we want our reindexer to look like this: [0, 4, 1, 5, 2, 6, 3, 7]. We can accomplish this - # by zipping(range(0, 4), (4, 8)), which gives us [(0, 4), (1, 5), (2, 6), (3, 7)], and then flattening that list using sum(list, tuple()) - # which will result in our flattened indexer [0, 4, 1, 5, 2, 6, 3, 7]. - column_reindexer = list( - sum( - zip( - range(0, len(original_pivot_qc_columns)), - range( - len(original_pivot_qc_columns), - 2 * len(original_pivot_qc_columns), - ), - ), - tuple(), - ) - ) - else: - # When there is more than one pivot column, we need to reindex differently, as the example above shows. Say we have have 2 unique values in - # the first pivot column, and 2 unique values in the second pivot column (as above). Then, our final reindexer should look like this: - # [0, 1, 4, 2, 3, 5]. We can determine how many columns correspond to each first pivot column value by looking at the column MultiIndex for - # the pivotted QC. We can convert that to a frame using the `to_frame` MultiIndex API. Let's take a look at an example. - # Assuming that the MultiIndex (after converting to a frame) looks like this (i.e. there are 2 distinct values for the first pivot column, - # and 3 for the second): - # B C - # 0 on.e dull - # 1 on.e shi'ny - # 2 on.e sy - # 3 tw"o dull - # 4 tw"o shi'ny - mi_as_frame = original_pivot_qc_columns.to_frame(index=False) - # We can then groupby the first pivot column, and call count, which will tell us how many columns correspond to each label from the first pivot column. - # C - # B - # on.e 3 - # tw"o 2 - # If there are multiple columns and multiple aggregation functions, we need to groupby the first two columns instead of just the first one - - # as the first column will be the name of the aggregation function, and the second column will be the values from the first pivot column. - if isinstance(aggfunc, list): - groupby_columns = mi_as_frame.columns[:2].tolist() - value_column_index = 2 - else: - groupby_columns = mi_as_frame.columns[0] - value_column_index = 1 - pivot_multiindex_level_one_lengths = np.cumsum( - mi_as_frame.groupby(groupby_columns, sort=False) - .count()[mi_as_frame.columns[value_column_index]] - .values[:-1] + pivot_qc = ( + expand_pivot_result_with_pivot_table_margins_no_groupby_columns( + pivot_qc, + self._modin_frame, + pivot_aggr_groupings, + dropna, + columns, + aggfunc, + pivot_snowflake_quoted_identifiers, + values, + margins_name, ) - # We can grab the first column from this groupby (in case there are more than 2 pivot columns), and use these splits with np.split, which will tell us - # the groupings of the columns. E.g., in this case, we would want the following splits for the indexes: [(0, 1, 2), (3, 4)]. Calling np.split with - # the values from above (excluding the last value) will result in that output. We call tuple on the splits to get them in tuple format. - split_original_pivot_qc_indexes = [ - list(group) - for group in np.split( - range(len(original_pivot_qc_columns)), - pivot_multiindex_level_one_lengths, - ) - ] - # Once we have the splits [[0, 1, 2], [3, 4]], we can then insert the indices for the margins columns. - reindexer = [ - group + [margin_index] - for group, margin_index in zip( - split_original_pivot_qc_indexes, - range( - len(original_pivot_qc_columns), len(pivot_qc.columns) - ), - ) - ] - # Now, we have a list that looks like this: [[0, 1, 2, 5], [3, 4, 6]] - we need to make this into a flat list of indexes. - column_reindexer = sum(reindexer, list()) - pivot_qc = pivot_qc.take_2d_positional(slice(None), column_reindexer) - - if len(columns) == 1: - # After reindexing, we have to rename the margins columns to the correct name if we only have one pivot column. - if original_pivot_qc_columns.nlevels == 1: - pivot_qc = pivot_qc.set_columns( - pd.Index( - list( - sum( - zip( - original_pivot_qc_columns, - [margins_name] - * len(original_pivot_qc_columns), - ), - tuple(), - ) - ) - ).set_names(names) - ) - else: - # If there are multiple levels in the index even though there is a single pivot column, we need to copy over the prefixes as well. - new_index_names = [] - for label in original_pivot_qc_columns: - new_index_names.extend( - [label, label[:-1] + (margins_name,)] - ) - new_index = pd.MultiIndex.from_tuples( - new_index_names - ).set_names(names) - pivot_qc = pivot_qc.set_columns(new_index) + ) # Rename the data column snowflake quoted identifiers to be closer to pandas labels given we # may have done unwrapping of surrounding quotes, ie. so will unwrap single quotes in snowflake identifiers. diff --git a/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py b/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py index c89c67cece6..8aceacb10ae 100644 --- a/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py +++ b/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py @@ -1992,12 +1992,20 @@ def pivot_table(): Notes ----- - Raise NotImplementedError if + - Raise NotImplementedError if * observed or sort is given; * or index, columns, or values is not str, a list of str, or None; * or DataFrame contains MultiIndex; - * or any argfunc is not "count", "mean", "min", "max", or "sum" + * or any aggfunc is not "count", "mean", "min", "max", or "sum" + * index is None, and aggfunc is a dictionary containing lists. + + - `margins`=True with no index has limited support: + * when aggfunc is "count" or "mean" the result has discrepancies with pandas - + Snowpark pandas computes the aggfunc over the data grouped by the first pivot + column, while pandas computes the aggfunc over the result of the aggfunc from + the initial pivot. + * aggfunc as a dictionary is not supported. See Also -------- diff --git a/tests/integ/modin/pivot/test_pivot_margins.py b/tests/integ/modin/pivot/test_pivot_margins.py index e6ab6e5016a..81ebaff6839 100644 --- a/tests/integ/modin/pivot/test_pivot_margins.py +++ b/tests/integ/modin/pivot/test_pivot_margins.py @@ -10,18 +10,25 @@ from tests.integ.modin.sql_counter import SqlCounter, sql_count_checker +@pytest.mark.parametrize("index", [None, "A"], ids=["no_index", "single_index"]) @pytest.mark.parametrize("dropna", [True, False]) @pytest.mark.parametrize("columns", ["C", ["B", "C"]]) @pytest.mark.parametrize("fill_value", [None, 99.99]) def test_pivot_table_single_with_dropna_options( - df_data_with_nulls, dropna, columns, fill_value + df_data_with_nulls, index, dropna, columns, fill_value ): expected_join_count = 2 if not dropna else 1 + if not dropna and index is None: + expected_join_count += 1 + if len(columns) > 1 and index is None: + pytest.xfail( + reason="SNOW-1435365 - pandas computes values differently than us: https://github.com/pandas-dev/pandas/issues/58722." + ) with SqlCounter(query_count=1, join_count=expected_join_count): pivot_table_test_helper( df_data_with_nulls, { - "index": "A", + "index": index, "columns": columns, "values": "D", "dropna": dropna, @@ -31,6 +38,39 @@ def test_pivot_table_single_with_dropna_options( ) +# Not marking as strict since the following test cases pass: +# [None-C-True-no_index] +# [None-columns1-True-no_index] +# [None-C-False-no_index] +# [None-columns1-False-no_index] +@pytest.mark.xfail( + reason="SNOW-1435365 - we do not support margins=True, with no index and aggfunc as a dictionary." +) +@pytest.mark.parametrize("index", [None, "A"], ids=["no_index", "single_index"]) +@pytest.mark.parametrize("dropna", [True, False]) +@pytest.mark.parametrize("columns", ["C", ["B", "C"]]) +@pytest.mark.parametrize("fill_value", [None, 99.99]) +def test_pivot_table_single_with_dropna_options_multiple_aggr_funcs( + df_data_with_nulls, index, dropna, columns, fill_value +): + expected_join_count = 2 if not dropna else 1 + if not dropna and index is None: + expected_join_count += 1 + with SqlCounter(query_count=1, join_count=expected_join_count): + pivot_table_test_helper( + df_data_with_nulls, + { + "index": index, + "columns": columns, + "values": ["D", "E"], + "dropna": dropna, + "fill_value": fill_value, + "margins": True, + "aggfunc": {"D": "sum", "E": "max"}, + }, + ) + + @pytest.mark.parametrize( "aggfunc", [ @@ -66,6 +106,20 @@ def test_pivot_table_multiple_columns_values_with_margins( ) +@pytest.mark.parametrize( + "index", + [ + pytest.param( + None, + marks=pytest.mark.xfail( + strict=True, + reason="SNOW-1435365 - pandas computes values differently than us: https://github.com/pandas-dev/pandas/issues/58722.", + ), + ), + ["A", "B"], + ], + ids=["no_index", "multiple_index"], +) @pytest.mark.parametrize( "fill_value", [ @@ -82,12 +136,12 @@ def test_pivot_table_multiple_columns_values_with_margins( ) @sql_count_checker(query_count=1, join_count=9, union_count=1) def test_pivot_table_multiple_pivot_values_null_data_with_margins( - df_data_with_nulls, fill_value + df_data_with_nulls, index, fill_value ): pivot_table_test_helper( df_data_with_nulls, { - "index": ["A", "B"], + "index": index, "columns": "C", "values": "F", "aggfunc": ["count", "sum", "mean"], @@ -99,6 +153,9 @@ def test_pivot_table_multiple_pivot_values_null_data_with_margins( ) +@pytest.mark.parametrize( + "index", [None, ["A", "B"]], ids=["no_index", "multiple_index"] +) @pytest.mark.parametrize( "fill_value", [ @@ -113,23 +170,25 @@ def test_pivot_table_multiple_pivot_values_null_data_with_margins( ), ], ) -@sql_count_checker(query_count=1, join_count=6, union_count=1) def test_pivot_table_multiple_pivot_values_null_data_with_margins_nan_blocked( - df_data_with_nulls, fill_value + df_data_with_nulls, index, fill_value ): - pivot_table_test_helper( - df_data_with_nulls, - { - "index": ["A", "B"], - "columns": "C", - "values": "F", - "aggfunc": ["min", "max"], - "dropna": False, - "fill_value": fill_value, - "margins": True, - "margins_name": "TOTAL", - }, - ) + join_count = 7 if index is None and fill_value is None else 6 + union_count = 0 if index is None and fill_value is None else 1 + with SqlCounter(query_count=1, join_count=join_count, union_count=union_count): + pivot_table_test_helper( + df_data_with_nulls, + { + "index": index, + "columns": "C", + "values": "F", + "aggfunc": ["min", "max"], + "dropna": False, + "fill_value": fill_value, + "margins": True, + "margins_name": "TOTAL", + }, + ) @sql_count_checker(query_count=1, join_count=12, union_count=1) diff --git a/tests/integ/modin/pivot/test_pivot_multiple.py b/tests/integ/modin/pivot/test_pivot_multiple.py index c457a089d56..c0e98d3900d 100644 --- a/tests/integ/modin/pivot/test_pivot_multiple.py +++ b/tests/integ/modin/pivot/test_pivot_multiple.py @@ -48,6 +48,39 @@ def test_pivot_table_no_index_single_column_multiple_values_multiple_aggr_func(d ) +@sql_count_checker(query_count=1, union_count=1) +@pytest.mark.parametrize("columns", ["B", ["B", "C"]]) +def test_pivot_table_no_index_multiple_values_single_aggr_func_dict(df_data, columns): + pivot_table_test_helper( + df_data, + { + "columns": columns, + "values": ["D", "E"], + "aggfunc": {"D": "mean", "E": "max"}, + }, + ) + + +# pandas moves the name of the aggfunc into the data columns as an index column. +@pytest.mark.xfail( + strict=True, + reason="SNOW-1435365 - look into no index + aggfunc as dictionary with list.", +) +@sql_count_checker(query_count=1, union_count=1) +@pytest.mark.parametrize("columns", ["B", ["B", "C"]]) +def test_pivot_table_no_index_column_multiple_values_multiple_aggr_func_dict( + df_data, columns +): + pivot_table_test_helper( + df_data, + { + "columns": columns, + "values": ["D", "E"], + "aggfunc": {"D": ["mean", "sum"], "E": "max"}, + }, + ) + + @sql_count_checker(query_count=1, join_count=1) def test_pivot_table_no_index_single_column_single_values_multiple_aggr_func(df_data): pivot_table_test_helper( From d93f1197eaedac1bdcf743eff1837d773dd56fbf Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Fri, 17 May 2024 15:10:30 -0700 Subject: [PATCH 16/18] Added support -> Added partial support in changelog --- src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md b/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md index 6ac89e62234..4094001c5e4 100644 --- a/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md +++ b/src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md @@ -23,7 +23,7 @@ - Added partial support for `SeriesGroupBy.apply` (where the `SeriesGrouBy` is obtained through `DataFrameGroupBy.__getitem__`). - Added support for `pd.NamedAgg` in `DataFrameGroupBy.agg` and `SeriesGroupBy.agg`. - Added support for `Series.str.slice`. -- Added support for `DataFrame.pivot_table` with no `index` parameter, as well as for `margins` parameter. +- Added partial support for `DataFrame.pivot_table` with no `index` parameter, as well as for `margins` parameter. ## 1.15.0a1 (2024-05-03) From a963f13aba57fa3a87654d5d6c1fa19bf32f7c3b Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Fri, 17 May 2024 15:28:33 -0700 Subject: [PATCH 17/18] Fix docs --- src/snowflake/snowpark/modin/pandas/general.py | 10 +++++----- .../snowpark/modin/plugin/docstrings/dataframe.py | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/snowflake/snowpark/modin/pandas/general.py b/src/snowflake/snowpark/modin/pandas/general.py index df590e6c99a..34bbea9506d 100644 --- a/src/snowflake/snowpark/modin/pandas/general.py +++ b/src/snowflake/snowpark/modin/pandas/general.py @@ -535,17 +535,17 @@ def pivot_table( ----- - Raise NotImplementedError if - * observed, or sort is given; + * observed or sort is given; * or index, columns, or values is not str, a list of str, or None; * or DataFrame contains MultiIndex; * or any aggfunc is not "count", "mean", "min", "max", or "sum" * index is None, and aggfunc is a dictionary containing lists. - - `margins`=True with no index has limited support: + - Computing margins with no index has limited support: * when aggfunc is "count" or "mean" the result has discrepancies with pandas - - Snowpark pandas computes the aggfunc over the data grouped by the first pivot - column, while pandas computes the aggfunc over the result of the aggfunc from - the initial pivot. + Snowpark pandas computes the aggfunc over the data grouped by the first pivot + column, while pandas computes the aggfunc over the result of the aggfunc from + the initial pivot. * aggfunc as a dictionary is not supported. See Also diff --git a/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py b/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py index 8aceacb10ae..6da7beede87 100644 --- a/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py +++ b/src/snowflake/snowpark/modin/plugin/docstrings/dataframe.py @@ -2000,11 +2000,11 @@ def pivot_table(): * or any aggfunc is not "count", "mean", "min", "max", or "sum" * index is None, and aggfunc is a dictionary containing lists. - - `margins`=True with no index has limited support: + - Computing margins with no index has limited support: * when aggfunc is "count" or "mean" the result has discrepancies with pandas - - Snowpark pandas computes the aggfunc over the data grouped by the first pivot - column, while pandas computes the aggfunc over the result of the aggfunc from - the initial pivot. + Snowpark pandas computes the aggfunc over the data grouped by the first pivot + column, while pandas computes the aggfunc over the result of the aggfunc from + the initial pivot. * aggfunc as a dictionary is not supported. See Also From 0400ab11f1f3f19c4dcd62e90433178321e8e9e1 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Tue, 21 May 2024 13:52:29 -0700 Subject: [PATCH 18/18] Move to snowpark changelog --- CHANGELOG.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9489afbb3d0..538ec78812f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,21 @@ # Release History +## 1.18.0 (TBD) + +### Snowpark Python API Updates + +#### New Features + +#### Improvements + +### Snowpark pandas API Updates + +#### New Features + +#### Improvements + +- Added partial support for `DataFrame.pivot_table` with no `index` parameter, as well as for `margins` parameter. + ## 1.17.0 (2024-05-21) ### Snowpark Python API Updates