Skip to content

Commit

Permalink
SNOW-1347393: Remove fallback and raise NotImplementedError for Group…
Browse files Browse the repository at this point in the history
…By APIs (#1460)

Please answer these questions before submitting your pull requests.
Thanks!

1. What GitHub issue is this PR addressing? Make sure that there is an
accompanying issue to your PR.

   SNOW-1347393

2. Fill out the following pre-review checklist:

- [x] I am adding a new automated test(s) to verify correctness of my
new code
   - [ ] I am adding new logging messages
   - [ ] I am adding a new telemetry message
   - [ ] I am adding new credentials
   - [ ] I am adding a new dependency

3. Please describe how your code solves the related issue.

This PR removes fallback and raises NotImplementedError for GroupBy
APIs.

---------

Signed-off-by: Naren Krishna <[email protected]>
  • Loading branch information
sfc-gh-nkrishna authored Apr 30, 2024
1 parent 2a236b6 commit 8620f85
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 229 deletions.
1 change: 1 addition & 0 deletions src/snowflake/snowpark/modin/plugin/PANDAS_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- `DataFrame/Series.fillna` if given the `limit` or `downcast` parameter.
- `dot` binary operation between `DataFrame/Series`.
- `xor` binary operation between `DataFrame/Series`.
- All `DataFrame/Series.groupby` operations if either `axis == 1`, both `by` and `level` are configured, or `by` contains any non-pandas hashable labels.

## 1.14.0a2 (2024-04-18)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@
when,
year,
)
from snowflake.snowpark.modin.core.dataframe.algebra.default2pandas import (
GroupByDefault,
)
from snowflake.snowpark.modin.plugin._internal import (
concat_utils,
generator_utils,
Expand Down Expand Up @@ -2471,36 +2468,20 @@ def groupby_ngroups(
level = groupby_kwargs.get("level", None)
dropna = groupby_kwargs.get("dropna", True)

can_be_distributed = check_is_groupby_supported_by_snowflake(by, level, axis)

def fallback_ngroups() -> int:
"""
Creates a SnowflakeQueryCompiler through a fallback operation,
whose snowpark dataframe holds the result of the ngroups operation.
The snowpark dataframe has the form of [Row('0'=<ngroups_value>, ...)]
and we call collect to return this result. Please note that this will
trigger an eager evaluation.
"""
query_compiler: SnowflakeQueryCompiler = GroupByDefault.register(
native_pd.core.groupby.DataFrameGroupBy.ngroups
)(
self,
by=by,
axis=axis,
groupby_kwargs=groupby_kwargs,
is_supported = check_is_groupby_supported_by_snowflake(by, level, axis)
if not is_supported:
ErrorMessage.not_implemented(
"Snowpark pandas GroupBy.ngroups does not yet support axis == 1, by != None and level != None, or by containing any non-pandas hashable labels."
)
ngroups_result = query_compiler._modin_frame.ordered_dataframe.collect()
return ngroups_result[0]["0"]

if not can_be_distributed:
return fallback_ngroups()

query_compiler = get_frame_with_groupby_columns_as_index(
self, by, level, dropna
)

if query_compiler is None:
return fallback_ngroups()
ErrorMessage.not_implemented(
"Snowpark pandas GroupBy.ngroups does not yet support axis == 1, by != None and level != None, or by containing any non-pandas hashable labels."
)

internal_frame = query_compiler._modin_frame

Expand Down Expand Up @@ -2571,22 +2552,11 @@ def groupby_agg(
"""

level = groupby_kwargs.get("level", None)
can_be_distributed = check_is_groupby_supported_by_snowflake(
is_supported = check_is_groupby_supported_by_snowflake(
by, level, axis
) and check_is_aggregation_supported_in_snowflake(agg_func, agg_kwargs, axis)

def register_default_to_pandas() -> SnowflakeQueryCompiler:
return GroupByDefault.register(GroupByDefault.get_aggregation_method(how))(
self,
by=by,
agg_func=agg_func,
axis=axis,
groupby_kwargs=groupby_kwargs,
agg_args=agg_args,
agg_kwargs=agg_kwargs,
)

if not can_be_distributed:
if not is_supported:
if agg_func in ["head", "tail"]:
# head and tail cannot be run per column - it is run on the
# whole table at once.
Expand All @@ -2598,7 +2568,9 @@ def register_default_to_pandas() -> SnowflakeQueryCompiler:
dropna=agg_kwargs.get("dropna", True),
)
else:
return register_default_to_pandas()
ErrorMessage.not_implemented(
f"Snowpark pandas GroupBy.{agg_func} does not yet support pd.Grouper, axis == 1, by != None and level != None, by containing any non-pandas hashable labels, or unsupported aggregation parameters."
)

sort = groupby_kwargs.get("sort", True)
as_index = groupby_kwargs.get("as_index", True)
Expand All @@ -2611,7 +2583,9 @@ def register_default_to_pandas() -> SnowflakeQueryCompiler:
)

if query_compiler is None:
return register_default_to_pandas()
ErrorMessage.not_implemented(
f"Snowpark pandas GroupBy.{agg_func} does not yet support pd.Grouper, axis == 1, by != None and level != None, by containing any non-pandas hashable labels, or unsupported aggregation parameters."
)

by_list = query_compiler._modin_frame.index_column_pandas_labels

Expand Down
Loading

0 comments on commit 8620f85

Please sign in to comment.