Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SNOW-1013917]: Add support for pivot_table without index parameter #1488

Merged
merged 22 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4ec3164
Add test for empty table pivot_table
sfc-gh-rdurrani May 2, 2024
2d08c30
[SNOW-1013917]: Add support for `pivot_table` without `index` parameter
sfc-gh-rdurrani May 2, 2024
d2f621c
Merge branch 'main' into rdurrani-SNOW-1013917
sfc-gh-rdurrani May 7, 2024
861a28d
Add fix for test of multiple values + multiple aggr funcs
sfc-gh-rdurrani May 7, 2024
db83f84
Add changelog
sfc-gh-rdurrani May 7, 2024
fc19f4e
Add test to cover combinatorials
sfc-gh-rdurrani May 7, 2024
3d5da36
Remove outdated unsupported error tests
sfc-gh-rdurrani May 8, 2024
c52a368
Remove commented code
sfc-gh-rdurrani May 8, 2024
27dcc31
Address review comments, fix margins partially
sfc-gh-rdurrani May 16, 2024
baebd77
Remove commented code
sfc-gh-rdurrani May 16, 2024
1cd0d68
Merge branch 'main' into rdurrani-SNOW-1013917
sfc-gh-rdurrani May 16, 2024
b565974
Add support for margins fully
sfc-gh-rdurrani May 16, 2024
814f1e6
Add more rigorous testing
sfc-gh-rdurrani May 16, 2024
f57fd10
Address review comments
sfc-gh-rdurrani May 16, 2024
4282c90
Address review comments
sfc-gh-rdurrani May 16, 2024
8767ee9
Unskip supported tests, add strict=True for xfailing test
sfc-gh-rdurrani May 16, 2024
72fe431
Add some more tests
sfc-gh-rdurrani May 17, 2024
d93f119
Added support -> Added partial support in changelog
sfc-gh-rdurrani May 17, 2024
a963f13
Fix docs
sfc-gh-rdurrani May 17, 2024
2659ca1
Merge branch 'main' into rdurrani-SNOW-1013917
sfc-gh-rdurrani May 21, 2024
0400ab1
Move to snowpark changelog
sfc-gh-rdurrani May 21, 2024
bf214f5
Merge branch 'main' into rdurrani-SNOW-1013917
sfc-gh-rdurrani May 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/source/modin/supported/dataframe_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,10 @@ Methods
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``pivot`` | N | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``pivot_table`` | P | ``observed``, ``margins``, | ``N`` if ``index``, ``columns``, or ``values`` is |
| | | ``sort`` | not str; or MultiIndex; or any ``argfunc`` is not |
| | | | "count", "mean", "min", "max", or "sum" |
| ``pivot_table`` | P | ``observed``, ``sort`` | ``N`` if ``index``, ``columns``, or ``values`` is |
| | | | not str, list of str, or None; or MultiIndex; or |
| | | | any ``argfunc`` is not "count", "mean", "min", |
| | | | "max", or "sum" |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``pop`` | N | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down
1 change: 1 addition & 0 deletions src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- Added partial support for `SeriesGroupBy.apply` (where the `SeriesGrouBy` is obtained through `DataFrameGroupBy.__getitem__`).
- Added support for `pd.NamedAgg` in `DataFrameGroupBy.agg` and `SeriesGroupBy.agg`.
- Added support for `Series.str.slice`.
- Added support for `DataFrame.pivot_table` with no `index` parameter, as well as for `margins` parameter.

## 1.15.0a1 (2024-05-03)

Expand Down
207 changes: 170 additions & 37 deletions src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from collections.abc import Generator, Hashable
from functools import reduce
from itertools import product
from typing import Any, Callable, Optional, Union
from typing import Any, Callable, NamedTuple, Optional, Union

from pandas._typing import AggFuncType, AggFuncTypeBase, Scalar

Expand Down Expand Up @@ -61,6 +61,94 @@
)


class PivotedOrderedDataFrameResult(NamedTuple):
# The OrderedDataFrame representation for the join or align result
ordered_dataframe: OrderedDataFrame
# The data column pandas labels of the new frame.
data_column_pandas_labels: list[Hashable]
# The data column snowflake quoted identifiers of the new frame.
data_column_snowflake_quoted_identifiers: list[str]


def perform_pivot_and_concatenate(
ordered_dataframe: OrderedDataFrame,
pivot_aggr_groupings: list[PivotAggrGrouping],
groupby_snowflake_quoted_identifiers: list[str],
pivot_snowflake_quoted_identifiers: list[str],
should_join_along_columns: bool,
) -> PivotedOrderedDataFrameResult:
"""
Helper function to perform a full pivot (including joining in the case of multiple aggrs or values) on an OrderedDataFrame.

Args:
ordered_dataframe: The ordered dataframe to perform pivot on.
pivot_aggr_groupings: A list of PivotAggrGroupings that define the aggregations to apply.
groupby_snowflake_quoted_identifiers: Group by identifiers
pivot_snowflake_quoted_identifiers: Pivot identifiers
should_join_along_columns: Whether to join along columns, or use union to join along rows instead.
"""
last_ordered_dataframe = None
data_column_pandas_labels: list[Hashable] = []
data_column_snowflake_quoted_identifiers: list[str] = []
for pivot_aggr_grouping in pivot_aggr_groupings:
existing_snowflake_quoted_identifiers = groupby_snowflake_quoted_identifiers
if last_ordered_dataframe is not None and should_join_along_columns:
# If there are no index columns, then we append the OrderedDataFrame's vertically, rather
# than horizontally, so we do not need to dedupe the columns (and in fact we want the columns
# to have the same name since we want them to match up during the union.
existing_snowflake_quoted_identifiers = (
last_ordered_dataframe.projected_column_snowflake_quoted_identifiers
)

(
new_pivot_ordered_dataframe,
new_data_column_snowflake_quoted_identifiers,
new_data_column_pandas_labels,
) = single_pivot_helper(
ordered_dataframe,
existing_snowflake_quoted_identifiers,
groupby_snowflake_quoted_identifiers,
pivot_snowflake_quoted_identifiers,
pivot_aggr_grouping.aggr_label_identifier_pair,
pivot_aggr_grouping.aggfunc,
pivot_aggr_grouping.prefix_label,
)

if last_ordered_dataframe:
# If there are index columns, then we join the two OrderedDataFrames
# (horizontally), while if there are no index columns, we concatenate
# them vertically, and have the index be the value column each row
# corresponds to.
# We also join vertically if there are multiple columns and multiple
# pivot values.
if should_join_along_columns:
last_ordered_dataframe = last_ordered_dataframe.join(
right=new_pivot_ordered_dataframe,
left_on_cols=groupby_snowflake_quoted_identifiers,
right_on_cols=groupby_snowflake_quoted_identifiers,
how="left",
)
data_column_snowflake_quoted_identifiers.extend(
new_data_column_snowflake_quoted_identifiers
)
data_column_pandas_labels.extend(new_data_column_pandas_labels)
else:
last_ordered_dataframe = last_ordered_dataframe.union_all(
new_pivot_ordered_dataframe
)
else:
last_ordered_dataframe = new_pivot_ordered_dataframe
data_column_snowflake_quoted_identifiers.extend(
new_data_column_snowflake_quoted_identifiers
)
data_column_pandas_labels.extend(new_data_column_pandas_labels)
return PivotedOrderedDataFrameResult(
last_ordered_dataframe,
data_column_pandas_labels,
data_column_snowflake_quoted_identifiers,
)


def pivot_helper(
pivot_frame: InternalFrame,
pivot_aggr_groupings: list[PivotAggrGrouping],
Expand All @@ -69,6 +157,8 @@ def pivot_helper(
columns: Any,
groupby_snowflake_quoted_identifiers: list[str],
pivot_snowflake_quoted_identifiers: list[str],
multiple_aggr_funcs: bool,
multiple_values: bool,
index: Optional[list],
) -> InternalFrame:
"""
Expand All @@ -82,6 +172,8 @@ def pivot_helper(
columns: The columns argument passed to `pivot_table`. Will become the pandas labels for the data column index.
groupby_snowflake_quoted_identifiers: Group by identifiers
pivot_snowflake_quoted_identifiers: Pivot identifiers
multiple_aggr_funcs: Whether multiple aggregation functions have been passed in.
multiple_values: Whether multiple values columns have been passed in.
index: The index argument passed to `pivot_table` if specified. Will become the pandas labels for the index column.
Returns:
InternalFrame
Expand All @@ -100,7 +192,6 @@ def pivot_helper(
if ordered_dataframe.queries.get("post_actions"):
ordered_dataframe = cache_result(ordered_dataframe)

last_ordered_dataframe = None
data_column_pandas_labels: list[Hashable] = []
data_column_snowflake_quoted_identifiers: list[str] = []

Expand Down Expand Up @@ -157,47 +248,91 @@ def pivot_helper(
#
# The multi-level pandas prefix label that includes the aggregation value and function labels is also
# constructed and passed into the single pivot operation to prepend the remaining of the pandas labels.
for pivot_aggr_grouping in pivot_aggr_groupings:
existing_snowflake_quoted_identifiers = groupby_snowflake_quoted_identifiers
if last_ordered_dataframe is not None:
existing_snowflake_quoted_identifiers = (
last_ordered_dataframe.projected_column_snowflake_quoted_identifiers
if (
len(groupby_snowflake_quoted_identifiers) == 0
and multiple_aggr_funcs
and multiple_values
):
# When there are multiple aggregation functions, values, and `index=None`, we need
# to handle pivot a little differently. Rather than just joining horizontally or vertically,
# we need to join both horizontally and vertically - each value column gets its own row, so
# for every resulting OrderedDataFrame corresponding to the result of an aggregation on a single
# value, we need to join (concatenate horizontally) to get one row. For every value column,
# we then need to union (concatenate vertically) the resulting rows from the previous step.
# In order to handle this, we first group the aggregations by the column they act on, and run
# one pivot per group of aggregations. We then have multiple one row OrderedDataFrames, where each
# OrderedDataFrame is the result of pivot on a single value column, which we can union in order to
# get our final result.
# Step 1: Determine the values columns.
values_pandas_labels = {
pair.aggr_label_identifier_pair.pandas_label
for pair in pivot_aggr_groupings
}
# Step 2: Group aggregations by the values column they are on.
# Result: {"val_col1": [aggr1, aggr2], "val_col2}": [aggr3, aggr4]}
grouped_pivot_aggr_groupings = {
v: list(
filter(
lambda pair: pair.aggr_label_identifier_pair.pandas_label == v,
pivot_aggr_groupings,
)
)

for v in values_pandas_labels
}
# Step 5: Perform pivot for every value column, and union together.
last_ordered_dataframe = None
for value_column in values_pandas_labels:
(
pivot_ordered_dataframe,
new_data_column_pandas_labels,
new_data_column_snowflake_quoted_identifiers,
) = perform_pivot_and_concatenate(
ordered_dataframe,
grouped_pivot_aggr_groupings[value_column],
groupby_snowflake_quoted_identifiers,
pivot_snowflake_quoted_identifiers,
True,
)
if last_ordered_dataframe is None:
last_ordered_dataframe = pivot_ordered_dataframe
data_column_pandas_labels = new_data_column_pandas_labels
data_column_snowflake_quoted_identifiers = (
new_data_column_snowflake_quoted_identifiers
)
else:
last_ordered_dataframe = last_ordered_dataframe.union_all(
pivot_ordered_dataframe
)
assert (
new_data_column_pandas_labels == data_column_pandas_labels
), "Labels should match when doing multiple values and multiple aggregation functions and no index."
ordered_dataframe = last_ordered_dataframe
else:
# If there are no index columns (groupby_snowflake_quoted_identifiers) and
# a single aggregation function or a single value, we should join vertically
# instead of horizontally.
should_join_along_columns = len(groupby_snowflake_quoted_identifiers) > 0 or (
multiple_aggr_funcs and not multiple_values
)
(
new_pivot_ordered_dataframe,
new_data_column_snowflake_quoted_identifiers,
new_data_column_pandas_labels,
) = single_pivot_helper(
ordered_dataframe,
existing_snowflake_quoted_identifiers,
data_column_pandas_labels,
data_column_snowflake_quoted_identifiers,
) = perform_pivot_and_concatenate(
ordered_dataframe,
pivot_aggr_groupings,
groupby_snowflake_quoted_identifiers,
pivot_snowflake_quoted_identifiers,
pivot_aggr_grouping.aggr_label_identifier_pair,
pivot_aggr_grouping.aggfunc,
pivot_aggr_grouping.prefix_label,
should_join_along_columns,
)

if last_ordered_dataframe:
last_ordered_dataframe = last_ordered_dataframe.join(
right=new_pivot_ordered_dataframe,
left_on_cols=groupby_snowflake_quoted_identifiers,
right_on_cols=groupby_snowflake_quoted_identifiers,
how="left",
)
else:
last_ordered_dataframe = new_pivot_ordered_dataframe

data_column_snowflake_quoted_identifiers.extend(
new_data_column_snowflake_quoted_identifiers
)
data_column_pandas_labels.extend(new_data_column_pandas_labels)

ordered_dataframe = last_ordered_dataframe
# When there are no groupby columns, the index is the first column in the OrderedDataFrame.
# Otherwise, the index is the groupby columns.
length_of_index_columns = max(1, len(groupby_snowflake_quoted_identifiers))

index_column_snowflake_quoted_identifiers = (
ordered_dataframe.projected_column_snowflake_quoted_identifiers[
0 : len(groupby_snowflake_quoted_identifiers)
0:length_of_index_columns
]
)
index = index or [None] * len(index_column_snowflake_quoted_identifiers)
Expand Down Expand Up @@ -299,9 +434,7 @@ def single_pivot_helper(
project_snowflake_quoted_identifiers
)

index_snowflake_quoted_identifiers = (
groupby_snowflake_quoted_identifiers or pivot_snowflake_quoted_identifiers or []
)
index_snowflake_quoted_identifiers = groupby_snowflake_quoted_identifiers or []

if not pivot_snowflake_quoted_identifiers or not aggr_snowflake_quoted_identifier:
if not groupby_snowflake_quoted_identifiers:
Expand Down Expand Up @@ -400,6 +533,7 @@ def single_pivot_helper(
),
"*",
)
index_snowflake_quoted_identifiers = [pivot_snowflake_quoted_identifiers[0]]

# Go through each of the non-group by columns and
# 1. Generate corresponding pandas label (without prefix)
Expand Down Expand Up @@ -686,7 +820,6 @@ def generate_single_pivot_labels(

if not pandas_aggfunc_list:
continue

# 2. Loop through all aggregation functions for this aggregation value.
for pandas_single_aggr_func in pandas_aggfunc_list:
# pandas only adds aggregation value as label if provided as a list
Expand Down
Loading
Loading