Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1489371: Implement GroupBy.value_counts #1986

Merged
merged 21 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
- Added support for `Index.is_boolean`, `Index.is_integer`, `Index.is_floating`, `Index.is_numeric`, and `Index.is_object`.
- Added support for `DatetimeIndex.round`, `DatetimeIndex.floor` and `DatetimeIndex.ceil`.
- Added support for `Series.dt.days_in_month` and `Series.dt.daysinmonth`.
- Added support for `DataFrameGroupBy.value_counts` and `SeriesGroupBy.value_counts`.
sfc-gh-joshi marked this conversation as resolved.
Show resolved Hide resolved

#### Improvements

Expand Down
2 changes: 2 additions & 0 deletions docs/source/modin/groupby.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ GroupBy
DataFrameGroupBy.std
DataFrameGroupBy.sum
DataFrameGroupBy.tail
DataFrameGroupBy.value_counts
DataFrameGroupBy.var

.. rubric:: `SeriesGroupBy` computations / descriptive stats
Expand Down Expand Up @@ -90,4 +91,5 @@ GroupBy
SeriesGroupBy.std
SeriesGroupBy.sum
SeriesGroupBy.tail
SeriesGroupBy.value_counts
SeriesGroupBy.var
2 changes: 1 addition & 1 deletion docs/source/modin/supported/groupby_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ Computations/descriptive stats
+-----------------------------+---------------------------------+----------------------------------------------------+
| ``take`` | N | |
+-----------------------------+---------------------------------+----------------------------------------------------+
| ``value_counts`` | N | |
| ``value_counts`` | P | ``N`` if ``bins`` is given for SeriesGroupBy |
+-----------------------------+---------------------------------+----------------------------------------------------+
| ``var`` | P | See ``std`` |
+-----------------------------+---------------------------------+----------------------------------------------------+
Expand Down
61 changes: 59 additions & 2 deletions src/snowflake/snowpark/modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
create_groupby_transform_func,
)
from snowflake.snowpark.modin.plugin._internal.telemetry import TelemetryMeta
from snowflake.snowpark.modin.plugin._internal.utils import INDEX_LABEL
from snowflake.snowpark.modin.plugin.compiler.snowflake_query_compiler import (
SnowflakeQueryCompiler,
)
Expand Down Expand Up @@ -188,13 +189,28 @@ def sem(self, ddof=1):

def value_counts(
self,
subset=None,
subset: Optional[list[str]] = None,
normalize: bool = False,
sort: bool = True,
ascending: bool = False,
dropna: bool = True,
):
ErrorMessage.method_not_implemented_error(name="value_counts", class_="GroupBy")
query_compiler = self._query_compiler.groupby_value_counts(
by=self._by,
axis=self._axis,
groupby_kwargs=self._kwargs,
subset=subset,
normalize=normalize,
sort=sort,
ascending=ascending,
dropna=dropna,
)
if self._as_index:
sfc-gh-joshi marked this conversation as resolved.
Show resolved Hide resolved
return pd.Series(
query_compiler=query_compiler,
name="proportion" if normalize else "count",
)
return pd.DataFrame(query_compiler=query_compiler)

def mean(
self,
Expand Down Expand Up @@ -1314,6 +1330,47 @@ def get_group(self, name, obj=None):
name="get_group", class_="SeriesGroupBy"
)

def value_counts(
self,
subset: Optional[list[str]] = None,
normalize: bool = False,
sort: bool = True,
ascending: bool = False,
bins: Optional[int] = None,
dropna: bool = True,
):
# TODO: SNOW-1063349: Modin upgrade - modin.pandas.groupby.SeriesGroupBy functions
# Modin upstream defaults to pandas for this method, so we need to either override this or
# rewrite this logic to be friendlier to other backends.
#
# Unlike DataFrameGroupBy, SeriesGroupBy has an additional `bins` parameter.
qc = self._query_compiler
# The "by" list becomes the new index, which we then perform the group by on. We call
# reset_index to let the query compiler treat it as a data column so it can be grouped on.
if self._by is not None:
qc = (
qc.set_index_from_series(pd.Series(self._by)._query_compiler)
.set_index_names([INDEX_LABEL])
.reset_index()
)
result_qc = qc.groupby_value_counts(
by=[INDEX_LABEL],
axis=self._axis,
groupby_kwargs=self._kwargs,
subset=subset,
normalize=normalize,
sort=sort,
ascending=ascending,
bins=bins,
dropna=dropna,
)
# Reset the names in the MultiIndex
result_qc = result_qc.set_index_names([None] * result_qc.nlevels())
return pd.Series(
query_compiler=result_qc,
name="proportion" if normalize else "count",
)


def validate_groupby_args(
by: Any,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import uuid
from collections.abc import Hashable, Iterable, Mapping, Sequence
from datetime import timedelta, tzinfo
from typing import Any, Callable, List, Literal, Optional, Tuple, Union, get_args
from typing import Any, Callable, List, Literal, Optional, Union, get_args

import numpy as np
import numpy.typing as npt
Expand Down Expand Up @@ -5034,6 +5034,161 @@ def groupby_all(
drop=drop,
)

def groupby_value_counts(
self,
by: Any,
axis: int,
groupby_kwargs: dict[str, Any],
subset: Optional[list[str]],
normalize: bool = False,
sort: bool = True,
ascending: bool = False,
bins: Optional[int] = None,
dropna: bool = True,
) -> "SnowflakeQueryCompiler":
level = groupby_kwargs.get("level", None)
as_index = groupby_kwargs.get("as_index", True)
groupby_sort = groupby_kwargs.get("sort", True)
is_supported = check_is_groupby_supported_by_snowflake(by, level, axis)
if not is_supported:
ErrorMessage.not_implemented(
f"Snowpark pandas GroupBy.value_counts {_GROUPBY_UNSUPPORTED_GROUPING_MESSAGE}"
)
if bins is not None:
raise ErrorMessage.not_implemented("bins argument is not yet supported")
if not is_list_like(by):
by = [by]
if len(set(by) & set(subset or [])):
# Check for overlap between by and subset. Since column names may contain customer data,
# unlike pandas, we do not include the offending labels in the error message.
raise ValueError("Keys in subset cannot be in the groupby column keys")
if subset is not None:
subset_list = subset
else:
# If subset is unspecified, then all columns should be included.
subset_list = self._modin_frame.data_column_pandas_labels
# The grouping columns are always included in the subset.
# Furthermore, the columns of the output must have the grouping columns first, in the order
# that they were specified.
subset_list = by + list(filter(lambda label: label not in by, subset_list))

if as_index:
# When as_index=True, the result is a Series with a MultiIndex index.
result = self._value_counts_groupby(
by=subset_list,
# Use sort=False to preserve the original order
sort=False,
normalize=normalize,
ascending=False,
dropna=dropna,
normalize_within_groups=by,
)
else:
# When as_index=False, the result is a DataFrame where count/proportion is appended as a new named column.
result = self._value_counts_groupby(
by=subset_list,
# Use sort=False to preserve the original order
sort=False,
normalize=normalize,
ascending=False,
dropna=dropna,
normalize_within_groups=by,
).reset_index()
result = result.set_columns(
result._modin_frame.data_column_pandas_labels[:-1]
+ ["proportion" if normalize else "count"]
)
# pandas currently provides the following behaviors based on the different sort flags.
# These behaviors are not entirely consistent with documentation; see this issue for discussion:
# https://github.com/pandas-dev/pandas/issues/59307
#
# Example data (using pandas 2.2.1 behavior):
# >>> df = pd.DataFrame({"X": ["B", "A", "A", "B", "B", "B"], "Y": [4, 1, 3, -2, -1, -1]})
#
# 1. groupby(sort=True).value_counts(sort=True)
# Sort on non-grouping columns, then sort on frequencies, then sort on grouping columns.
# >>> df.groupby("X", sort=True).value_counts(sort=True)
# X Y
# A 1 1
# 3 1
# B -1 2
# -2 1
# 4 1
# Name: count, dtype: int64
#
# 2. groupby(sort=True).value_counts(sort=False)
# Sort on non-grouping columns, then sort on grouping columns.
# >>> df.groupby("X", sort=True).value_counts(sort=True)
# X Y
# X Y
# A 1 1
# 3 1
# B -2 1
# -1 2
# 4 1
# Name: count, dtype: int64
#
# 3. groupby(sort=False).value_counts(sort=True)
# Sort on frequencies.
# >>> df.groupby("X", sort=False).value_counts(sort=True)
# X Y
# B -1 2
# 4 1
# A 1 1
# 3 1
# B -2 1
# Name: count, dtype: int64
#
# 4. groupby(sort=False).value_counts(sort=False)
# Sort on nothing (entries match the order of the original frame).
# X Y
# B 4 1
# A 1 1
# 3 1
# B -2 1
# -1 2
# Name: count, dtype: int64
#
# Lastly, when `normalize` is set with groupby(sort=False).value_counts(sort=True, normalize=True),
# pandas will sort by the pre-normalization counts rather than the resulting proportions. As this
# is an uncommon edge case, we cannot handle this using existing QC methods efficiently, so we just
# update our testing code to account for this.
# See comment on issue: https://github.com/pandas-dev/pandas/issues/59307#issuecomment-2313767856
sort_cols = []
if groupby_sort:
# When groupby(sort=True), sort the result on the grouping columns
sort_cols = by
ascending_cols = [True] * len(sort_cols)
if sort:
# When sort=True, also sort on the count/proportion column (always the last)
sort_cols.append(
result._modin_frame.data_column_pandas_labels[-1],
)
ascending_cols.append(ascending)
if groupby_sort:
# When groupby_sort=True, also sort by the non-grouping columns before sorting by
# the count/proportion column. The left-most column (nearest to the grouping columns
# is sorted on last).
# Exclude the grouping columns (always the first) from the sort.
if as_index:
# When as_index is true, the non-grouping columns are part of the index columns
columns_to_filter = result._modin_frame.index_column_pandas_labels
else:
# When as_index is false, the non-grouping columns are part of the data columns
columns_to_filter = result._modin_frame.data_column_pandas_labels
non_grouping_cols = [
col_label for col_label in columns_to_filter if col_label not in by
]
sort_cols.extend(non_grouping_cols)
ascending_cols.extend([True] * len(non_grouping_cols))
return result.sort_rows_by_column_values(
columns=sort_cols,
ascending=ascending_cols,
kind="stable",
na_position="last",
ignore_index=not as_index, # When as_index=False, take the default positional index
)

def _get_dummies_helper(
self,
column: Hashable,
Expand Down Expand Up @@ -11518,11 +11673,13 @@ def value_counts(

def _value_counts_groupby(
self,
by: Union[List[Hashable], Tuple[Hashable, ...]],
by: Sequence[Hashable],
normalize: bool,
sort: bool,
ascending: bool,
dropna: bool,
*,
normalize_within_groups: Optional[list[str]] = None,
) -> "SnowflakeQueryCompiler":
"""
Helper method to obtain the frequency or number of unique values
Expand All @@ -11544,6 +11701,10 @@ def _value_counts_groupby(
Sort in ascending order.
dropna : bool
Don't include counts of NaN.
normalize_within_groups : list[str], optional
If set, the normalize parameter will normalize based on the specified groups
rather than the entire dataset. This parameter is exclusive to the Snowpark pandas
query compiler and is only used internally to implement groupby_value_counts.
"""
self._raise_not_implemented_error_for_timedelta()

Expand Down Expand Up @@ -11573,9 +11734,21 @@ def _value_counts_groupby(
# they are normalized to percentages as [2/(2+1+1), 1/(2+1+1), 1/(2+1+1)] = [0.5, 0.25, 0.25]
# by default, ratio_to_report returns a decimal column, whereas pandas returns a float column
if normalize:
if normalize_within_groups:
# If normalize_within_groups is set, then the denominator for ratio_to_report should
# be the size of each group instead.
normalize_snowflake_quoted_identifiers = [
entry[0]
for entry in internal_frame.get_snowflake_quoted_identifiers_group_by_pandas_labels(
normalize_within_groups
)
]
window = Window.partition_by(normalize_snowflake_quoted_identifiers)
else:
window = None
internal_frame = query_compiler._modin_frame.project_columns(
[COUNT_LABEL],
builtin("ratio_to_report")(col(count_identifier)).over(),
builtin("ratio_to_report")(col(count_identifier)).over(window),
)
count_identifier = internal_frame.data_column_snowflake_quoted_identifiers[
0
Expand Down
Loading
Loading