Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SNOW-1013917]: Add support for pivot_table without index parameter #1488

Merged
merged 22 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4ec3164
Add test for empty table pivot_table
sfc-gh-rdurrani May 2, 2024
2d08c30
[SNOW-1013917]: Add support for `pivot_table` without `index` parameter
sfc-gh-rdurrani May 2, 2024
d2f621c
Merge branch 'main' into rdurrani-SNOW-1013917
sfc-gh-rdurrani May 7, 2024
861a28d
Add fix for test of multiple values + multiple aggr funcs
sfc-gh-rdurrani May 7, 2024
db83f84
Add changelog
sfc-gh-rdurrani May 7, 2024
fc19f4e
Add test to cover combinatorials
sfc-gh-rdurrani May 7, 2024
3d5da36
Remove outdated unsupported error tests
sfc-gh-rdurrani May 8, 2024
c52a368
Remove commented code
sfc-gh-rdurrani May 8, 2024
27dcc31
Address review comments, fix margins partially
sfc-gh-rdurrani May 16, 2024
baebd77
Remove commented code
sfc-gh-rdurrani May 16, 2024
1cd0d68
Merge branch 'main' into rdurrani-SNOW-1013917
sfc-gh-rdurrani May 16, 2024
b565974
Add support for margins fully
sfc-gh-rdurrani May 16, 2024
814f1e6
Add more rigorous testing
sfc-gh-rdurrani May 16, 2024
f57fd10
Address review comments
sfc-gh-rdurrani May 16, 2024
4282c90
Address review comments
sfc-gh-rdurrani May 16, 2024
8767ee9
Unskip supported tests, add strict=True for xfailing test
sfc-gh-rdurrani May 16, 2024
72fe431
Add some more tests
sfc-gh-rdurrani May 17, 2024
d93f119
Added support -> Added partial support in changelog
sfc-gh-rdurrani May 17, 2024
a963f13
Fix docs
sfc-gh-rdurrani May 17, 2024
2659ca1
Merge branch 'main' into rdurrani-SNOW-1013917
sfc-gh-rdurrani May 21, 2024
0400ab1
Move to snowpark changelog
sfc-gh-rdurrani May 21, 2024
bf214f5
Merge branch 'main' into rdurrani-SNOW-1013917
sfc-gh-rdurrani May 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

### New Features
- Added partial support for `SeriesGroupBy.apply` (where the `SeriesGrouBy` is obtained through `DataFrameGroupBy.__getitem__`).
- Added support for `DataFrame.pivot_table` with no `index` parameter.

## 1.14.0a2 (2024-04-18)

Expand Down
180 changes: 143 additions & 37 deletions src/snowflake/snowpark/modin/plugin/_internal/pivot_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from collections.abc import Generator, Hashable
from functools import reduce
from itertools import product
from typing import Any, Callable, Optional, Union
from typing import Any, Callable, NamedTuple, Optional, Union

from pandas._typing import AggFuncType, AggFuncTypeBase, Scalar

Expand Down Expand Up @@ -61,6 +61,84 @@
)


class PivottedOrderedDataFrameResult(NamedTuple):
sfc-gh-rdurrani marked this conversation as resolved.
Show resolved Hide resolved
# The OrderedDataFrame representation for the join or align result
ordered_dataframe: OrderedDataFrame
# The data column pandas labels of the new frame.
data_column_pandas_labels: list[Hashable]
# The data column snowflake quoted identifiers of the new frame.
data_column_snowflake_quoted_identifiers: list[str]


def perform_pivot_and_concatenate(
ordered_dataframe: OrderedDataFrame,
pivot_aggr_groupings: list[PivotAggrGrouping],
groupby_snowflake_quoted_identifiers: list[str],
pivot_snowflake_quoted_identifiers: list[str],
should_join_along_columns: bool,
) -> tuple[OrderedDataFrame, list[Hashable], list[str]]:
sfc-gh-rdurrani marked this conversation as resolved.
Show resolved Hide resolved
last_ordered_dataframe = None
data_column_pandas_labels: list[Hashable] = []
data_column_snowflake_quoted_identifiers: list[str] = []
for pivot_aggr_grouping in pivot_aggr_groupings:
existing_snowflake_quoted_identifiers = groupby_snowflake_quoted_identifiers
if last_ordered_dataframe is not None and should_join_along_columns:
# If there are no index columns, then we append the OrderedDataFrame's vertically, rather
# than horizontally, so we do not need to dedupe the columns (and in fact we want the columns
# to have the same name since we want them to match up during the union.
existing_snowflake_quoted_identifiers = (
last_ordered_dataframe.projected_column_snowflake_quoted_identifiers
)

(
new_pivot_ordered_dataframe,
new_data_column_snowflake_quoted_identifiers,
new_data_column_pandas_labels,
) = single_pivot_helper(
ordered_dataframe,
existing_snowflake_quoted_identifiers,
groupby_snowflake_quoted_identifiers,
pivot_snowflake_quoted_identifiers,
pivot_aggr_grouping.aggr_label_identifier_pair,
pivot_aggr_grouping.aggfunc,
pivot_aggr_grouping.prefix_label,
)

if last_ordered_dataframe:
# If there are index columns, then we join the two OrderedDataFrames
# (horizontally), while if there are no index columns, we concatenate
# them vertically, and have the index be the value column each row
# corresponds to.
# We also join vertically if there are multiple columns and multiple
# pivot values.
if should_join_along_columns:
last_ordered_dataframe = last_ordered_dataframe.join(
right=new_pivot_ordered_dataframe,
left_on_cols=groupby_snowflake_quoted_identifiers,
right_on_cols=groupby_snowflake_quoted_identifiers,
how="left",
)
data_column_snowflake_quoted_identifiers.extend(
new_data_column_snowflake_quoted_identifiers
)
data_column_pandas_labels.extend(new_data_column_pandas_labels)
else:
last_ordered_dataframe = last_ordered_dataframe.union_all(
new_pivot_ordered_dataframe
)
else:
last_ordered_dataframe = new_pivot_ordered_dataframe
data_column_snowflake_quoted_identifiers.extend(
new_data_column_snowflake_quoted_identifiers
)
data_column_pandas_labels.extend(new_data_column_pandas_labels)
return PivottedOrderedDataFrameResult(
sfc-gh-rdurrani marked this conversation as resolved.
Show resolved Hide resolved
last_ordered_dataframe,
data_column_pandas_labels,
data_column_snowflake_quoted_identifiers,
)


def pivot_helper(
pivot_frame: InternalFrame,
pivot_aggr_groupings: list[PivotAggrGrouping],
Expand All @@ -69,6 +147,8 @@ def pivot_helper(
columns: Any,
groupby_snowflake_quoted_identifiers: list[str],
pivot_snowflake_quoted_identifiers: list[str],
multiple_aggr_funcs: bool,
multiple_values: bool,
index: Optional[list],
) -> InternalFrame:
"""
Expand All @@ -82,6 +162,8 @@ def pivot_helper(
columns: The columns argument passed to `pivot_table`. Will become the pandas labels for the data column index.
groupby_snowflake_quoted_identifiers: Group by identifiers
pivot_snowflake_quoted_identifiers: Pivot identifiers
multiple_aggr_funcs: Whether or not multiple aggregation functions have been passed in.
multiple_values: Whether or not multiple values columns have been passed in.
sfc-gh-rdurrani marked this conversation as resolved.
Show resolved Hide resolved
index: The index argument passed to `pivot_table` if specified. Will become the pandas labels for the index column.
Returns:
InternalFrame
Expand All @@ -100,7 +182,6 @@ def pivot_helper(
if ordered_dataframe.queries.get("post_actions"):
ordered_dataframe = cache_result(ordered_dataframe)

last_ordered_dataframe = None
data_column_pandas_labels: list[Hashable] = []
data_column_snowflake_quoted_identifiers: list[str] = []

Expand Down Expand Up @@ -157,47 +238,74 @@ def pivot_helper(
#
# The multi-level pandas prefix label that includes the aggregation value and function labels is also
# constructed and passed into the single pivot operation to prepend the remaining of the pandas labels.
for pivot_aggr_grouping in pivot_aggr_groupings:
existing_snowflake_quoted_identifiers = groupby_snowflake_quoted_identifiers
if last_ordered_dataframe is not None:
existing_snowflake_quoted_identifiers = (
last_ordered_dataframe.projected_column_snowflake_quoted_identifiers
if (
len(groupby_snowflake_quoted_identifiers) == 0
and multiple_aggr_funcs
and multiple_values
):
values_pandas_labels = {
pair.aggr_label_identifier_pair.pandas_label
for pair in pivot_aggr_groupings
}
grouped_pivot_aggr_groupings = {
v: list(
filter(
lambda pair: pair.aggr_label_identifier_pair.pandas_label == v,
pivot_aggr_groupings,
)
)

for v in values_pandas_labels
}
last_ordered_dataframe = None
for value_column in values_pandas_labels:
(
pivot_ordered_dataframe,
new_data_column_pandas_labels,
new_data_column_snowflake_quoted_identifiers,
) = perform_pivot_and_concatenate(
ordered_dataframe,
grouped_pivot_aggr_groupings[value_column],
groupby_snowflake_quoted_identifiers,
pivot_snowflake_quoted_identifiers,
True,
)
if last_ordered_dataframe is None:
last_ordered_dataframe = pivot_ordered_dataframe
data_column_pandas_labels = new_data_column_pandas_labels
data_column_snowflake_quoted_identifiers = (
new_data_column_snowflake_quoted_identifiers
)
else:
last_ordered_dataframe = last_ordered_dataframe.union_all(
pivot_ordered_dataframe
)
assert (
new_data_column_pandas_labels == data_column_pandas_labels
), "Labels should match when doing multiple values and multiple aggregation functions and no index."
ordered_dataframe = last_ordered_dataframe
else:
should_join_along_columns = len(groupby_snowflake_quoted_identifiers) > 0 or (
multiple_aggr_funcs and not multiple_values
)
(
new_pivot_ordered_dataframe,
new_data_column_snowflake_quoted_identifiers,
new_data_column_pandas_labels,
) = single_pivot_helper(
ordered_dataframe,
existing_snowflake_quoted_identifiers,
data_column_pandas_labels,
data_column_snowflake_quoted_identifiers,
) = perform_pivot_and_concatenate(
ordered_dataframe,
pivot_aggr_groupings,
groupby_snowflake_quoted_identifiers,
pivot_snowflake_quoted_identifiers,
pivot_aggr_grouping.aggr_label_identifier_pair,
pivot_aggr_grouping.aggfunc,
pivot_aggr_grouping.prefix_label,
should_join_along_columns,
)

if last_ordered_dataframe:
last_ordered_dataframe = last_ordered_dataframe.join(
right=new_pivot_ordered_dataframe,
left_on_cols=groupby_snowflake_quoted_identifiers,
right_on_cols=groupby_snowflake_quoted_identifiers,
how="left",
)
else:
last_ordered_dataframe = new_pivot_ordered_dataframe

data_column_snowflake_quoted_identifiers.extend(
new_data_column_snowflake_quoted_identifiers
)
data_column_pandas_labels.extend(new_data_column_pandas_labels)

ordered_dataframe = last_ordered_dataframe
# When there are no groupby columns, the index is the first column in the OrderedDataFrame.
# Otherwise, the index is the groupby columns.
length_of_index_columns = max(1, len(groupby_snowflake_quoted_identifiers))

index_column_snowflake_quoted_identifiers = (
ordered_dataframe.projected_column_snowflake_quoted_identifiers[
0 : len(groupby_snowflake_quoted_identifiers)
0:length_of_index_columns
]
)
index = index or [None] * len(index_column_snowflake_quoted_identifiers)
Expand Down Expand Up @@ -299,9 +407,7 @@ def single_pivot_helper(
project_snowflake_quoted_identifiers
)

index_snowflake_quoted_identifiers = (
groupby_snowflake_quoted_identifiers or pivot_snowflake_quoted_identifiers or []
)
index_snowflake_quoted_identifiers = groupby_snowflake_quoted_identifiers or []

if not pivot_snowflake_quoted_identifiers or not aggr_snowflake_quoted_identifier:
if not groupby_snowflake_quoted_identifiers:
Expand Down Expand Up @@ -400,6 +506,7 @@ def single_pivot_helper(
),
"*",
)
index_snowflake_quoted_identifiers = [pivot_snowflake_quoted_identifiers[0]]

# Go through each of the non-group by columns and
# 1. Generate corresponding pandas label (without prefix)
Expand Down Expand Up @@ -686,7 +793,6 @@ def generate_single_pivot_labels(

if not pandas_aggfunc_list:
continue

# 2. Loop through all aggregation functions for this aggregation value.
for pandas_single_aggr_func in pandas_aggfunc_list:
# pandas only adds aggregation value as label if provided as a list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6326,11 +6326,6 @@ def pivot_table(
else []
)

if len(groupby_snowflake_quoted_identifiers) == 0:
raise NotImplementedError(
"pivot_table with no index configuration is currently not supported"
)

if values is None:
# If no values (aggregation columns) are specified, then we use all data columns that are neither
# groupby (index) nor pivot columns as the aggregation columns. For example, a dataframe with
Expand All @@ -6351,13 +6346,19 @@ def pivot_table(
values, self._modin_frame
)
)

multiple_agg_funcs_single_values = (
isinstance(aggfunc, list) and len(aggfunc) > 1
) and not (isinstance(values, list) and len(values) > 1)
include_pivot_columns_in_label = (
len(groupby_snowflake_quoted_identifiers) != 0
or multiple_agg_funcs_single_values
)
pivot_aggr_groupings = list(
generate_single_pivot_labels(
values_label_to_identifier_pairs_list,
aggfunc,
len(pivot_snowflake_quoted_identifiers) > 0,
isinstance(values, list),
isinstance(values, list) and include_pivot_columns_in_label,
sort,
)
)
Expand All @@ -6371,6 +6372,8 @@ def pivot_table(
columns,
groupby_snowflake_quoted_identifiers,
pivot_snowflake_quoted_identifiers,
(isinstance(aggfunc, list) and len(aggfunc) > 1),
(isinstance(values, list) and len(values) > 1),
index,
)

Expand Down
Loading
Loading