Skip to content

Commit

Permalink
SNOW-1489371: Implement GroupBy.value_counts (#1986)
Browse files Browse the repository at this point in the history
<!---
Please answer these questions before creating your pull request. Thanks!
--->

1. Which Jira issue is this PR addressing? Make sure that there is an
accompanying issue to your PR.

   <!---
   In this section, please add a Snowflake Jira issue number.
   
Note that if a corresponding GitHub issue exists, you should still
include
   the Snowflake Jira issue number. For example, for GitHub issue
#1400, you should
   add "SNOW-1335071" here.
    --->

   Fixes SNOW-1489371

2. Fill out the following pre-review checklist:

- [x] I am adding a new automated test(s) to verify correctness of my
new code
- [ ] If this test skips Local Testing mode, I'm requesting review from
@snowflakedb/local-testing
   - [ ] I am adding new logging messages
   - [ ] I am adding a new telemetry message
   - [ ] I am adding new credentials
   - [ ] I am adding a new dependency
- [ ] If this is a new feature/behavior, I'm adding the Local Testing
parity changes.

3. Please describe how your code solves the related issue.

This PR adds support for GroupBy.value_counts, accepting all parameters
except `bin`, which we do not support for DataFrame/Series.value_counts.

Upstream modin defaults to pandas for both
DataFrameGroupBy/SeriesGroupBy.value_counts, so some of these changes
should be eventually upstreamed.

pandas has different behavior than what might be expected from
documentation; this PR tries to align with existing behavior as much as
possible. This is documented in this pandas issue:
pandas-dev/pandas#59307

1. When `normalize=True`, pandas sorts by the pre-normalization counts,
leading to counterintuitive results. This only matters when `groupby` is
called with `sort=False` and `value_counts` with `sort=True`. See test
cases for an example.
2. pandas does not always respect the original order of data, depending
on the configuration of sort flags in `groupby` and the `value_counts`
call itself. The behaviors are as follows (copied from query compiler
comments):
```
# pandas currently provides the following behaviors based on the different sort flags.
# These behaviors are not entirely consistent with documentation; see this issue for discussion:
# pandas-dev/pandas#59307
#
# Example data (using pandas 2.2.1 behavior):
# >>> df = pd.DataFrame({"X": ["B", "A", "A", "B", "B", "B"], "Y": [4, 1, 3, -2, -1, -1]})
#
# 1. groupby(sort=True).value_counts(sort=True)
#   Sort on non-grouping columns, then sort on frequencies, then sort on grouping columns.
# >>> df.groupby("X", sort=True).value_counts(sort=True)
# X  Y
# A   1    1
#     3    1
# B  -1    2
#    -2    1
#     4    1
# Name: count, dtype: int64
#
# 2. groupby(sort=True).value_counts(sort=False)
#   Sort on non-grouping columns, then sort on grouping columns.
# >>> df.groupby("X", sort=True).value_counts(sort=True)
# X  Y
# X  Y
# A   1    1
#     3    1
# B  -2    1
#    -1    2
#     4    1
# Name: count, dtype: int64
#
# 3. groupby(sort=False).value_counts(sort=True)
#   Sort on frequencies.
# >>> df.groupby("X", sort=False).value_counts(sort=True)
# X  Y
# B  -1    2
#     4    1
# A   1    1
#     3    1
# B  -2    1
# Name: count, dtype: int64
#
# 4. groupby(sort=False).value_counts(sort=False)
#   Sort on nothing (entries match the order of the original frame).
# X  Y
# B   4    1
# A   1    1
#     3    1
# B  -2    1
#    -1    2
# Name: count, dtype: int64
#
# Lastly, when `normalize` is set with groupby(sort=False).value_counts(sort=True, normalize=True),
# pandas will sort by the pre-normalization counts rather than the resulting proportions. As this
# is an uncommon edge case, we cannot handle this using existing QC methods efficiently, so we just
# update our testing code to account for this.
# See comment 
```

---------

Co-authored-by: Andong Zhan <[email protected]>
  • Loading branch information
sfc-gh-joshi and sfc-gh-azhan authored Aug 30, 2024
1 parent 19538ef commit 20837fc
Show file tree
Hide file tree
Showing 8 changed files with 573 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
- Added support for `Index.is_boolean`, `Index.is_integer`, `Index.is_floating`, `Index.is_numeric`, and `Index.is_object`.
- Added support for `DatetimeIndex.round`, `DatetimeIndex.floor` and `DatetimeIndex.ceil`.
- Added support for `Series.dt.days_in_month` and `Series.dt.daysinmonth`.
- Added support for `DataFrameGroupBy.value_counts` and `SeriesGroupBy.value_counts`.

#### Improvements

Expand Down
2 changes: 2 additions & 0 deletions docs/source/modin/groupby.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ GroupBy
DataFrameGroupBy.std
DataFrameGroupBy.sum
DataFrameGroupBy.tail
DataFrameGroupBy.value_counts
DataFrameGroupBy.var

.. rubric:: `SeriesGroupBy` computations / descriptive stats
Expand Down Expand Up @@ -90,4 +91,5 @@ GroupBy
SeriesGroupBy.std
SeriesGroupBy.sum
SeriesGroupBy.tail
SeriesGroupBy.value_counts
SeriesGroupBy.var
2 changes: 1 addition & 1 deletion docs/source/modin/supported/groupby_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ Computations/descriptive stats
+-----------------------------+---------------------------------+----------------------------------------------------+
| ``take`` | N | |
+-----------------------------+---------------------------------+----------------------------------------------------+
| ``value_counts`` | N | |
| ``value_counts`` | P | ``N`` if ``bins`` is given for SeriesGroupBy |
+-----------------------------+---------------------------------+----------------------------------------------------+
| ``var`` | P | See ``std`` |
+-----------------------------+---------------------------------+----------------------------------------------------+
Expand Down
61 changes: 59 additions & 2 deletions src/snowflake/snowpark/modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
create_groupby_transform_func,
)
from snowflake.snowpark.modin.plugin._internal.telemetry import TelemetryMeta
from snowflake.snowpark.modin.plugin._internal.utils import INDEX_LABEL
from snowflake.snowpark.modin.plugin.compiler.snowflake_query_compiler import (
SnowflakeQueryCompiler,
)
Expand Down Expand Up @@ -188,13 +189,28 @@ def sem(self, ddof=1):

def value_counts(
self,
subset=None,
subset: Optional[list[str]] = None,
normalize: bool = False,
sort: bool = True,
ascending: bool = False,
dropna: bool = True,
):
ErrorMessage.method_not_implemented_error(name="value_counts", class_="GroupBy")
query_compiler = self._query_compiler.groupby_value_counts(
by=self._by,
axis=self._axis,
groupby_kwargs=self._kwargs,
subset=subset,
normalize=normalize,
sort=sort,
ascending=ascending,
dropna=dropna,
)
if self._as_index:
return pd.Series(
query_compiler=query_compiler,
name="proportion" if normalize else "count",
)
return pd.DataFrame(query_compiler=query_compiler)

def mean(
self,
Expand Down Expand Up @@ -1314,6 +1330,47 @@ def get_group(self, name, obj=None):
name="get_group", class_="SeriesGroupBy"
)

def value_counts(
self,
subset: Optional[list[str]] = None,
normalize: bool = False,
sort: bool = True,
ascending: bool = False,
bins: Optional[int] = None,
dropna: bool = True,
):
# TODO: SNOW-1063349: Modin upgrade - modin.pandas.groupby.SeriesGroupBy functions
# Modin upstream defaults to pandas for this method, so we need to either override this or
# rewrite this logic to be friendlier to other backends.
#
# Unlike DataFrameGroupBy, SeriesGroupBy has an additional `bins` parameter.
qc = self._query_compiler
# The "by" list becomes the new index, which we then perform the group by on. We call
# reset_index to let the query compiler treat it as a data column so it can be grouped on.
if self._by is not None:
qc = (
qc.set_index_from_series(pd.Series(self._by)._query_compiler)
.set_index_names([INDEX_LABEL])
.reset_index()
)
result_qc = qc.groupby_value_counts(
by=[INDEX_LABEL],
axis=self._axis,
groupby_kwargs=self._kwargs,
subset=subset,
normalize=normalize,
sort=sort,
ascending=ascending,
bins=bins,
dropna=dropna,
)
# Reset the names in the MultiIndex
result_qc = result_qc.set_index_names([None] * result_qc.nlevels())
return pd.Series(
query_compiler=result_qc,
name="proportion" if normalize else "count",
)


def validate_groupby_args(
by: Any,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import uuid
from collections.abc import Hashable, Iterable, Mapping, Sequence
from datetime import timedelta, tzinfo
from typing import Any, Callable, List, Literal, Optional, Tuple, Union, get_args
from typing import Any, Callable, List, Literal, Optional, Union, get_args

import numpy as np
import numpy.typing as npt
Expand Down Expand Up @@ -5041,6 +5041,161 @@ def groupby_all(
drop=drop,
)

def groupby_value_counts(
self,
by: Any,
axis: int,
groupby_kwargs: dict[str, Any],
subset: Optional[list[str]],
normalize: bool = False,
sort: bool = True,
ascending: bool = False,
bins: Optional[int] = None,
dropna: bool = True,
) -> "SnowflakeQueryCompiler":
level = groupby_kwargs.get("level", None)
as_index = groupby_kwargs.get("as_index", True)
groupby_sort = groupby_kwargs.get("sort", True)
is_supported = check_is_groupby_supported_by_snowflake(by, level, axis)
if not is_supported:
ErrorMessage.not_implemented(
f"Snowpark pandas GroupBy.value_counts {_GROUPBY_UNSUPPORTED_GROUPING_MESSAGE}"
)
if bins is not None:
raise ErrorMessage.not_implemented("bins argument is not yet supported")
if not is_list_like(by):
by = [by]
if len(set(by) & set(subset or [])):
# Check for overlap between by and subset. Since column names may contain customer data,
# unlike pandas, we do not include the offending labels in the error message.
raise ValueError("Keys in subset cannot be in the groupby column keys")
if subset is not None:
subset_list = subset
else:
# If subset is unspecified, then all columns should be included.
subset_list = self._modin_frame.data_column_pandas_labels
# The grouping columns are always included in the subset.
# Furthermore, the columns of the output must have the grouping columns first, in the order
# that they were specified.
subset_list = by + list(filter(lambda label: label not in by, subset_list))

if as_index:
# When as_index=True, the result is a Series with a MultiIndex index.
result = self._value_counts_groupby(
by=subset_list,
# Use sort=False to preserve the original order
sort=False,
normalize=normalize,
ascending=False,
dropna=dropna,
normalize_within_groups=by,
)
else:
# When as_index=False, the result is a DataFrame where count/proportion is appended as a new named column.
result = self._value_counts_groupby(
by=subset_list,
# Use sort=False to preserve the original order
sort=False,
normalize=normalize,
ascending=False,
dropna=dropna,
normalize_within_groups=by,
).reset_index()
result = result.set_columns(
result._modin_frame.data_column_pandas_labels[:-1]
+ ["proportion" if normalize else "count"]
)
# pandas currently provides the following behaviors based on the different sort flags.
# These behaviors are not entirely consistent with documentation; see this issue for discussion:
# https://github.com/pandas-dev/pandas/issues/59307
#
# Example data (using pandas 2.2.1 behavior):
# >>> df = pd.DataFrame({"X": ["B", "A", "A", "B", "B", "B"], "Y": [4, 1, 3, -2, -1, -1]})
#
# 1. groupby(sort=True).value_counts(sort=True)
# Sort on non-grouping columns, then sort on frequencies, then sort on grouping columns.
# >>> df.groupby("X", sort=True).value_counts(sort=True)
# X Y
# A 1 1
# 3 1
# B -1 2
# -2 1
# 4 1
# Name: count, dtype: int64
#
# 2. groupby(sort=True).value_counts(sort=False)
# Sort on non-grouping columns, then sort on grouping columns.
# >>> df.groupby("X", sort=True).value_counts(sort=True)
# X Y
# X Y
# A 1 1
# 3 1
# B -2 1
# -1 2
# 4 1
# Name: count, dtype: int64
#
# 3. groupby(sort=False).value_counts(sort=True)
# Sort on frequencies.
# >>> df.groupby("X", sort=False).value_counts(sort=True)
# X Y
# B -1 2
# 4 1
# A 1 1
# 3 1
# B -2 1
# Name: count, dtype: int64
#
# 4. groupby(sort=False).value_counts(sort=False)
# Sort on nothing (entries match the order of the original frame).
# X Y
# B 4 1
# A 1 1
# 3 1
# B -2 1
# -1 2
# Name: count, dtype: int64
#
# Lastly, when `normalize` is set with groupby(sort=False).value_counts(sort=True, normalize=True),
# pandas will sort by the pre-normalization counts rather than the resulting proportions. As this
# is an uncommon edge case, we cannot handle this using existing QC methods efficiently, so we just
# update our testing code to account for this.
# See comment on issue: https://github.com/pandas-dev/pandas/issues/59307#issuecomment-2313767856
sort_cols = []
if groupby_sort:
# When groupby(sort=True), sort the result on the grouping columns
sort_cols = by
ascending_cols = [True] * len(sort_cols)
if sort:
# When sort=True, also sort on the count/proportion column (always the last)
sort_cols.append(
result._modin_frame.data_column_pandas_labels[-1],
)
ascending_cols.append(ascending)
if groupby_sort:
# When groupby_sort=True, also sort by the non-grouping columns before sorting by
# the count/proportion column. The left-most column (nearest to the grouping columns
# is sorted on last).
# Exclude the grouping columns (always the first) from the sort.
if as_index:
# When as_index is true, the non-grouping columns are part of the index columns
columns_to_filter = result._modin_frame.index_column_pandas_labels
else:
# When as_index is false, the non-grouping columns are part of the data columns
columns_to_filter = result._modin_frame.data_column_pandas_labels
non_grouping_cols = [
col_label for col_label in columns_to_filter if col_label not in by
]
sort_cols.extend(non_grouping_cols)
ascending_cols.extend([True] * len(non_grouping_cols))
return result.sort_rows_by_column_values(
columns=sort_cols,
ascending=ascending_cols,
kind="stable",
na_position="last",
ignore_index=not as_index, # When as_index=False, take the default positional index
)

def _get_dummies_helper(
self,
column: Hashable,
Expand Down Expand Up @@ -11525,11 +11680,13 @@ def value_counts(

def _value_counts_groupby(
self,
by: Union[List[Hashable], Tuple[Hashable, ...]],
by: Sequence[Hashable],
normalize: bool,
sort: bool,
ascending: bool,
dropna: bool,
*,
normalize_within_groups: Optional[list[str]] = None,
) -> "SnowflakeQueryCompiler":
"""
Helper method to obtain the frequency or number of unique values
Expand All @@ -11551,6 +11708,10 @@ def _value_counts_groupby(
Sort in ascending order.
dropna : bool
Don't include counts of NaN.
normalize_within_groups : list[str], optional
If set, the normalize parameter will normalize based on the specified groups
rather than the entire dataset. This parameter is exclusive to the Snowpark pandas
query compiler and is only used internally to implement groupby_value_counts.
"""
self._raise_not_implemented_error_for_timedelta()

Expand Down Expand Up @@ -11580,9 +11741,21 @@ def _value_counts_groupby(
# they are normalized to percentages as [2/(2+1+1), 1/(2+1+1), 1/(2+1+1)] = [0.5, 0.25, 0.25]
# by default, ratio_to_report returns a decimal column, whereas pandas returns a float column
if normalize:
if normalize_within_groups:
# If normalize_within_groups is set, then the denominator for ratio_to_report should
# be the size of each group instead.
normalize_snowflake_quoted_identifiers = [
entry[0]
for entry in internal_frame.get_snowflake_quoted_identifiers_group_by_pandas_labels(
normalize_within_groups
)
]
window = Window.partition_by(normalize_snowflake_quoted_identifiers)
else:
window = None
internal_frame = query_compiler._modin_frame.project_columns(
[COUNT_LABEL],
builtin("ratio_to_report")(col(count_identifier)).over(),
builtin("ratio_to_report")(col(count_identifier)).over(window),
)
count_identifier = internal_frame.data_column_snowflake_quoted_identifiers[
0
Expand Down
Loading

0 comments on commit 20837fc

Please sign in to comment.