From 19538ef8a979dccd016c68e56059eeb35a8ff570 Mon Sep 17 00:00:00 2001 From: Naresh Kumar <113932371+sfc-gh-nkumar@users.noreply.github.com> Date: Fri, 30 Aug 2024 13:44:44 -0700 Subject: [PATCH 1/2] SNOW-1637945: Add support for TimedeltaIndex attributes (#2193) Fixes SNOW-1637945 Add support for TimedeltaIndex attributes `days`, `seconds`, `microseconds`, and `nanoseconds`. --- CHANGELOG.md | 1 + .../supported/timedelta_index_supported.rst | 8 +-- .../modin/plugin/_internal/timestamp_utils.py | 4 +- .../compiler/snowflake_query_compiler.py | 49 +++++++++++++++ .../plugin/extensions/timedelta_index.py | 60 ++++++++++++------- .../index/test_timedelta_index_methods.py | 17 +++++- 6 files changed, 107 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0767d5d3a0a..7baea604bd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,6 +63,7 @@ - support for lazy `TimedeltaIndex`. - support for `pd.to_timedelta`. - support for `GroupBy` aggregations `min`, `max`, `mean`, `idxmax`, `idxmin`, `std`, `sum`, `median`, `count`, `any`, `all`, `size`, `nunique`. + - support for `TimedeltaIndex` attributes: `days`, `seconds`, `microseconds` and `nanoseconds`. - Added support for index's arithmetic and comparison operators. - Added support for `Series.dt.round`. - Added documentation pages for `DatetimeIndex`. diff --git a/docs/source/modin/supported/timedelta_index_supported.rst b/docs/source/modin/supported/timedelta_index_supported.rst index 73abe530fd7..cd5e64b8c98 100644 --- a/docs/source/modin/supported/timedelta_index_supported.rst +++ b/docs/source/modin/supported/timedelta_index_supported.rst @@ -15,13 +15,13 @@ Attributes +-----------------------------+---------------------------------+----------------------------------------------------+ | TimedeltaIndex attribute | Snowpark implemented? (Y/N/P/D) | Notes for current implementation | +-----------------------------+---------------------------------+----------------------------------------------------+ -| ``days`` | N | | +| ``days`` | Y | | +-----------------------------+---------------------------------+----------------------------------------------------+ -| ``seconds`` | N | | +| ``seconds`` | Y | | +-----------------------------+---------------------------------+----------------------------------------------------+ -| ``microseconds`` | N | | +| ``microseconds`` | Y | | +-----------------------------+---------------------------------+----------------------------------------------------+ -| ``nanoseconds`` | N | | +| ``nanoseconds`` | Y | | +-----------------------------+---------------------------------+----------------------------------------------------+ | ``components`` | N | | +-----------------------------+---------------------------------+----------------------------------------------------+ diff --git a/src/snowflake/snowpark/modin/plugin/_internal/timestamp_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/timestamp_utils.py index 380fe965b4d..c4873724789 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/timestamp_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/timestamp_utils.py @@ -21,9 +21,9 @@ cast, convert_timezone, date_part, - floor, iff, to_decimal, + trunc, ) from snowflake.snowpark.modin.plugin._internal.utils import pandas_lit from snowflake.snowpark.modin.plugin.utils.error_message import ErrorMessage @@ -176,7 +176,7 @@ def col_to_timedelta(col: Column, unit: str) -> Column: if not td_unit: # Same error as native pandas. raise ValueError(f"invalid unit abbreviation: {unit}") - return cast(floor(col * TIMEDELTA_UNIT_MULTIPLIER[td_unit]), LongType()) + return trunc(col * TIMEDELTA_UNIT_MULTIPLIER[td_unit]) PANDAS_DATETIME_FORMAT_TO_SNOWFLAKE_MAPPING = { diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index 50ce5e71310..079f132f372 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -135,6 +135,7 @@ to_variant, translate, trim, + trunc, uniform, upper, when, @@ -382,6 +383,12 @@ SUPPORTED_DT_FLOOR_CEIL_FREQS = ["day", "hour", "minute", "second"] +SECONDS_PER_DAY = 86400 +NANOSECONDS_PER_SECOND = 10**9 +NANOSECONDS_PER_MICROSECOND = 10**3 +MICROSECONDS_PER_SECOND = 10**6 +NANOSECONDS_PER_DAY = SECONDS_PER_DAY * NANOSECONDS_PER_SECOND + class SnowflakeQueryCompiler(BaseQueryCompiler): """based on: https://modin.readthedocs.io/en/0.11.0/flow/modin/backends/base/query_compiler.html @@ -17514,3 +17521,45 @@ def tz_convert(self, *args: Any, **kwargs: Any) -> None: def tz_localize(self, *args: Any, **kwargs: Any) -> None: ErrorMessage.method_not_implemented_error("tz_convert", "BasePandasDataset") + + def timedelta_property( + self, property_name: str, include_index: bool = False + ) -> "SnowflakeQueryCompiler": + """ + Extract a specified component of from Timedelta. + + Parameters + ---------- + property : {'days', 'seconds', 'microseconds', 'nanoseconds'} + The component to extract. + include_index: Whether to include the index columns in the operation. + + Returns + ------- + A new SnowflakeQueryCompiler with the extracted component. + """ + if not include_index: + assert ( + len(self.columns) == 1 + ), "dt only works for series" # pragma: no cover + + # mapping from the property name to the corresponding snowpark function + property_to_func_map = { + "days": lambda column: trunc(column / NANOSECONDS_PER_DAY), + "seconds": lambda column: trunc(column / NANOSECONDS_PER_SECOND) + % SECONDS_PER_DAY, + "microseconds": lambda column: trunc(column / NANOSECONDS_PER_MICROSECOND) + % MICROSECONDS_PER_SECOND, + "nanoseconds": lambda column: column % NANOSECONDS_PER_MICROSECOND, + } + func = property_to_func_map.get(property_name) + if not func: + class_prefix = ( + "TimedeltaIndex" if include_index else "Series.dt" + ) # pragma: no cover + raise ErrorMessage.not_implemented( + f"Snowpark pandas doesn't yet support the property '{class_prefix}.{property_name}'" + ) # pragma: no cover + return SnowflakeQueryCompiler( + self._modin_frame.apply_snowpark_function_to_columns(func, include_index) + ) diff --git a/src/snowflake/snowpark/modin/plugin/extensions/timedelta_index.py b/src/snowflake/snowpark/modin/plugin/extensions/timedelta_index.py index 86ed2a5ded4..dac1a78f740 100644 --- a/src/snowflake/snowpark/modin/plugin/extensions/timedelta_index.py +++ b/src/snowflake/snowpark/modin/plugin/extensions/timedelta_index.py @@ -130,7 +130,6 @@ def __init__( } self._init_index(data, _CONSTRUCTOR_DEFAULTS, query_compiler, **kwargs) - @timedelta_index_not_implemented() @property def days(self) -> Index: """ @@ -142,15 +141,18 @@ def days(self) -> Index: Examples -------- - >>> idx = pd.to_timedelta(["0 days", "10 days", "20 days"]) # doctest: +SKIP - >>> idx # doctest: +SKIP - TimedeltaIndex(['0 days', '10 days', '20 days'], - dtype='timedelta64[ns]', freq=None) - >>> idx.days # doctest: +SKIP + >>> idx = pd.to_timedelta(["0 days", "10 days", "20 days"]) + >>> idx + TimedeltaIndex(['0 days', '10 days', '20 days'], dtype='timedelta64[ns]', freq=None) + >>> idx.days Index([0, 10, 20], dtype='int64') """ + return Index( + query_compiler=self._query_compiler.timedelta_property( + "days", include_index=True + ) + ) - @timedelta_index_not_implemented() @property def seconds(self) -> Index: """ @@ -162,15 +164,18 @@ def seconds(self) -> Index: Examples -------- - >>> idx = pd.to_timedelta([1, 2, 3], unit='s') # doctest: +SKIP - >>> idx # doctest: +SKIP - TimedeltaIndex(['0 days 00:00:01', '0 days 00:00:02', '0 days 00:00:03'], - dtype='timedelta64[ns]', freq=None) - >>> idx.seconds # doctest: +SKIP - Index([1, 2, 3], dtype='int32') + >>> idx = pd.to_timedelta([1, 2, 3], unit='s') + >>> idx + TimedeltaIndex(['0 days 00:00:01', '0 days 00:00:02', '0 days 00:00:03'], dtype='timedelta64[ns]', freq=None) + >>> idx.seconds + Index([1, 2, 3], dtype='int64') """ + return Index( + query_compiler=self._query_compiler.timedelta_property( + "seconds", include_index=True + ) + ) - @timedelta_index_not_implemented() @property def microseconds(self) -> Index: """ @@ -182,16 +187,20 @@ def microseconds(self) -> Index: Examples -------- - >>> idx = pd.to_timedelta([1, 2, 3], unit='us') # doctest: +SKIP - >>> idx # doctest: +SKIP + >>> idx = pd.to_timedelta([1, 2, 3], unit='us') + >>> idx TimedeltaIndex(['0 days 00:00:00.000001', '0 days 00:00:00.000002', '0 days 00:00:00.000003'], dtype='timedelta64[ns]', freq=None) - >>> idx.microseconds # doctest: +SKIP - Index([1, 2, 3], dtype='int32') + >>> idx.microseconds + Index([1, 2, 3], dtype='int64') """ + return Index( + query_compiler=self._query_compiler.timedelta_property( + "microseconds", include_index=True + ) + ) - @timedelta_index_not_implemented() @property def nanoseconds(self) -> Index: """ @@ -203,14 +212,19 @@ def nanoseconds(self) -> Index: Examples -------- - >>> idx = pd.to_timedelta([1, 2, 3], unit='ns') # doctest: +SKIP - >>> idx # doctest: +SKIP + >>> idx = pd.to_timedelta([1, 2, 3], unit='ns') + >>> idx TimedeltaIndex(['0 days 00:00:00.000000001', '0 days 00:00:00.000000002', '0 days 00:00:00.000000003'], dtype='timedelta64[ns]', freq=None) - >>> idx.nanoseconds # doctest: +SKIP - Index([1, 2, 3], dtype='int32') + >>> idx.nanoseconds + Index([1, 2, 3], dtype='int64') """ + return Index( + query_compiler=self._query_compiler.timedelta_property( + "nanoseconds", include_index=True + ) + ) @timedelta_index_not_implemented() @property diff --git a/tests/integ/modin/index/test_timedelta_index_methods.py b/tests/integ/modin/index/test_timedelta_index_methods.py index 1baafed24d2..646bd5ee983 100644 --- a/tests/integ/modin/index/test_timedelta_index_methods.py +++ b/tests/integ/modin/index/test_timedelta_index_methods.py @@ -8,6 +8,7 @@ import snowflake.snowpark.modin.plugin # noqa: F401 from tests.integ.modin.sql_counter import sql_count_checker +from tests.integ.modin.utils import assert_index_equal @sql_count_checker(query_count=3) @@ -54,12 +55,22 @@ def test_non_default_args(kwargs): pd.TimedeltaIndex(query_compiler=idx._query_compiler, **kwargs) -@pytest.mark.parametrize( - "property", ["days", "seconds", "microseconds", "nanoseconds", "inferred_freq"] -) +@pytest.mark.parametrize("property", ["components", "inferred_freq"]) @sql_count_checker(query_count=0) def test_property_not_implemented(property): snow_index = pd.TimedeltaIndex(["1 days", "2 days"]) msg = f"Snowpark pandas does not yet support the property TimedeltaIndex.{property}" with pytest.raises(NotImplementedError, match=msg): getattr(snow_index, property) + + +@pytest.mark.parametrize("attr", ["days", "seconds", "microseconds", "nanoseconds"]) +@sql_count_checker(query_count=1) +def test_timedelta_index_properties(attr): + native_index = native_pd.TimedeltaIndex( + ["1d", "1h", "60s", "1s", "800ms", "5us", "6ns", "1d 3s", "9m 15s 8us", None] + ) + snow_index = pd.Index(native_index) + assert_index_equal( + getattr(snow_index, attr), getattr(native_index, attr), exact=False + ) From 20837fc517766f0bc34222f056a5e1948ae7a16b Mon Sep 17 00:00:00 2001 From: Jonathan Shi <149419494+sfc-gh-joshi@users.noreply.github.com> Date: Fri, 30 Aug 2024 14:23:42 -0700 Subject: [PATCH 2/2] SNOW-1489371: Implement GroupBy.value_counts (#1986) 1. Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR. Fixes SNOW-1489371 2. Fill out the following pre-review checklist: - [x] I am adding a new automated test(s) to verify correctness of my new code - [ ] If this test skips Local Testing mode, I'm requesting review from @snowflakedb/local-testing - [ ] I am adding new logging messages - [ ] I am adding a new telemetry message - [ ] I am adding new credentials - [ ] I am adding a new dependency - [ ] If this is a new feature/behavior, I'm adding the Local Testing parity changes. 3. Please describe how your code solves the related issue. This PR adds support for GroupBy.value_counts, accepting all parameters except `bin`, which we do not support for DataFrame/Series.value_counts. Upstream modin defaults to pandas for both DataFrameGroupBy/SeriesGroupBy.value_counts, so some of these changes should be eventually upstreamed. pandas has different behavior than what might be expected from documentation; this PR tries to align with existing behavior as much as possible. This is documented in this pandas issue: https://github.com/pandas-dev/pandas/issues/59307 1. When `normalize=True`, pandas sorts by the pre-normalization counts, leading to counterintuitive results. This only matters when `groupby` is called with `sort=False` and `value_counts` with `sort=True`. See test cases for an example. 2. pandas does not always respect the original order of data, depending on the configuration of sort flags in `groupby` and the `value_counts` call itself. The behaviors are as follows (copied from query compiler comments): ``` # pandas currently provides the following behaviors based on the different sort flags. # These behaviors are not entirely consistent with documentation; see this issue for discussion: # https://github.com/pandas-dev/pandas/issues/59307 # # Example data (using pandas 2.2.1 behavior): # >>> df = pd.DataFrame({"X": ["B", "A", "A", "B", "B", "B"], "Y": [4, 1, 3, -2, -1, -1]}) # # 1. groupby(sort=True).value_counts(sort=True) # Sort on non-grouping columns, then sort on frequencies, then sort on grouping columns. # >>> df.groupby("X", sort=True).value_counts(sort=True) # X Y # A 1 1 # 3 1 # B -1 2 # -2 1 # 4 1 # Name: count, dtype: int64 # # 2. groupby(sort=True).value_counts(sort=False) # Sort on non-grouping columns, then sort on grouping columns. # >>> df.groupby("X", sort=True).value_counts(sort=True) # X Y # X Y # A 1 1 # 3 1 # B -2 1 # -1 2 # 4 1 # Name: count, dtype: int64 # # 3. groupby(sort=False).value_counts(sort=True) # Sort on frequencies. # >>> df.groupby("X", sort=False).value_counts(sort=True) # X Y # B -1 2 # 4 1 # A 1 1 # 3 1 # B -2 1 # Name: count, dtype: int64 # # 4. groupby(sort=False).value_counts(sort=False) # Sort on nothing (entries match the order of the original frame). # X Y # B 4 1 # A 1 1 # 3 1 # B -2 1 # -1 2 # Name: count, dtype: int64 # # Lastly, when `normalize` is set with groupby(sort=False).value_counts(sort=True, normalize=True), # pandas will sort by the pre-normalization counts rather than the resulting proportions. As this # is an uncommon edge case, we cannot handle this using existing QC methods efficiently, so we just # update our testing code to account for this. # See comment ``` --------- Co-authored-by: Andong Zhan --- CHANGELOG.md | 1 + docs/source/modin/groupby.rst | 2 + .../modin/supported/groupby_supported.rst | 2 +- .../snowpark/modin/pandas/groupby.py | 61 +++++- .../compiler/snowflake_query_compiler.py | 179 +++++++++++++++- .../modin/plugin/docstrings/groupby.py | 142 ++++++++++++- .../integ/modin/groupby/test_value_counts.py | 194 ++++++++++++++++++ tests/unit/modin/test_groupby_unsupported.py | 2 - 8 files changed, 573 insertions(+), 10 deletions(-) create mode 100644 tests/integ/modin/groupby/test_value_counts.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 7baea604bd9..8aab4250764 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,6 +77,7 @@ - Added support for `Index.is_boolean`, `Index.is_integer`, `Index.is_floating`, `Index.is_numeric`, and `Index.is_object`. - Added support for `DatetimeIndex.round`, `DatetimeIndex.floor` and `DatetimeIndex.ceil`. - Added support for `Series.dt.days_in_month` and `Series.dt.daysinmonth`. +- Added support for `DataFrameGroupBy.value_counts` and `SeriesGroupBy.value_counts`. #### Improvements diff --git a/docs/source/modin/groupby.rst b/docs/source/modin/groupby.rst index 97c99ce383d..e27a3bcf547 100644 --- a/docs/source/modin/groupby.rst +++ b/docs/source/modin/groupby.rst @@ -59,6 +59,7 @@ GroupBy DataFrameGroupBy.std DataFrameGroupBy.sum DataFrameGroupBy.tail + DataFrameGroupBy.value_counts DataFrameGroupBy.var .. rubric:: `SeriesGroupBy` computations / descriptive stats @@ -90,4 +91,5 @@ GroupBy SeriesGroupBy.std SeriesGroupBy.sum SeriesGroupBy.tail + SeriesGroupBy.value_counts SeriesGroupBy.var diff --git a/docs/source/modin/supported/groupby_supported.rst b/docs/source/modin/supported/groupby_supported.rst index f9ef001af29..3bcf3538216 100644 --- a/docs/source/modin/supported/groupby_supported.rst +++ b/docs/source/modin/supported/groupby_supported.rst @@ -166,7 +166,7 @@ Computations/descriptive stats +-----------------------------+---------------------------------+----------------------------------------------------+ | ``take`` | N | | +-----------------------------+---------------------------------+----------------------------------------------------+ -| ``value_counts`` | N | | +| ``value_counts`` | P | ``N`` if ``bins`` is given for SeriesGroupBy | +-----------------------------+---------------------------------+----------------------------------------------------+ | ``var`` | P | See ``std`` | +-----------------------------+---------------------------------+----------------------------------------------------+ diff --git a/src/snowflake/snowpark/modin/pandas/groupby.py b/src/snowflake/snowpark/modin/pandas/groupby.py index a373883317a..de89a48331b 100644 --- a/src/snowflake/snowpark/modin/pandas/groupby.py +++ b/src/snowflake/snowpark/modin/pandas/groupby.py @@ -49,6 +49,7 @@ create_groupby_transform_func, ) from snowflake.snowpark.modin.plugin._internal.telemetry import TelemetryMeta +from snowflake.snowpark.modin.plugin._internal.utils import INDEX_LABEL from snowflake.snowpark.modin.plugin.compiler.snowflake_query_compiler import ( SnowflakeQueryCompiler, ) @@ -188,13 +189,28 @@ def sem(self, ddof=1): def value_counts( self, - subset=None, + subset: Optional[list[str]] = None, normalize: bool = False, sort: bool = True, ascending: bool = False, dropna: bool = True, ): - ErrorMessage.method_not_implemented_error(name="value_counts", class_="GroupBy") + query_compiler = self._query_compiler.groupby_value_counts( + by=self._by, + axis=self._axis, + groupby_kwargs=self._kwargs, + subset=subset, + normalize=normalize, + sort=sort, + ascending=ascending, + dropna=dropna, + ) + if self._as_index: + return pd.Series( + query_compiler=query_compiler, + name="proportion" if normalize else "count", + ) + return pd.DataFrame(query_compiler=query_compiler) def mean( self, @@ -1314,6 +1330,47 @@ def get_group(self, name, obj=None): name="get_group", class_="SeriesGroupBy" ) + def value_counts( + self, + subset: Optional[list[str]] = None, + normalize: bool = False, + sort: bool = True, + ascending: bool = False, + bins: Optional[int] = None, + dropna: bool = True, + ): + # TODO: SNOW-1063349: Modin upgrade - modin.pandas.groupby.SeriesGroupBy functions + # Modin upstream defaults to pandas for this method, so we need to either override this or + # rewrite this logic to be friendlier to other backends. + # + # Unlike DataFrameGroupBy, SeriesGroupBy has an additional `bins` parameter. + qc = self._query_compiler + # The "by" list becomes the new index, which we then perform the group by on. We call + # reset_index to let the query compiler treat it as a data column so it can be grouped on. + if self._by is not None: + qc = ( + qc.set_index_from_series(pd.Series(self._by)._query_compiler) + .set_index_names([INDEX_LABEL]) + .reset_index() + ) + result_qc = qc.groupby_value_counts( + by=[INDEX_LABEL], + axis=self._axis, + groupby_kwargs=self._kwargs, + subset=subset, + normalize=normalize, + sort=sort, + ascending=ascending, + bins=bins, + dropna=dropna, + ) + # Reset the names in the MultiIndex + result_qc = result_qc.set_index_names([None] * result_qc.nlevels()) + return pd.Series( + query_compiler=result_qc, + name="proportion" if normalize else "count", + ) + def validate_groupby_args( by: Any, diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index 079f132f372..a803eb332e7 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -12,7 +12,7 @@ import uuid from collections.abc import Hashable, Iterable, Mapping, Sequence from datetime import timedelta, tzinfo -from typing import Any, Callable, List, Literal, Optional, Tuple, Union, get_args +from typing import Any, Callable, List, Literal, Optional, Union, get_args import numpy as np import numpy.typing as npt @@ -5041,6 +5041,161 @@ def groupby_all( drop=drop, ) + def groupby_value_counts( + self, + by: Any, + axis: int, + groupby_kwargs: dict[str, Any], + subset: Optional[list[str]], + normalize: bool = False, + sort: bool = True, + ascending: bool = False, + bins: Optional[int] = None, + dropna: bool = True, + ) -> "SnowflakeQueryCompiler": + level = groupby_kwargs.get("level", None) + as_index = groupby_kwargs.get("as_index", True) + groupby_sort = groupby_kwargs.get("sort", True) + is_supported = check_is_groupby_supported_by_snowflake(by, level, axis) + if not is_supported: + ErrorMessage.not_implemented( + f"Snowpark pandas GroupBy.value_counts {_GROUPBY_UNSUPPORTED_GROUPING_MESSAGE}" + ) + if bins is not None: + raise ErrorMessage.not_implemented("bins argument is not yet supported") + if not is_list_like(by): + by = [by] + if len(set(by) & set(subset or [])): + # Check for overlap between by and subset. Since column names may contain customer data, + # unlike pandas, we do not include the offending labels in the error message. + raise ValueError("Keys in subset cannot be in the groupby column keys") + if subset is not None: + subset_list = subset + else: + # If subset is unspecified, then all columns should be included. + subset_list = self._modin_frame.data_column_pandas_labels + # The grouping columns are always included in the subset. + # Furthermore, the columns of the output must have the grouping columns first, in the order + # that they were specified. + subset_list = by + list(filter(lambda label: label not in by, subset_list)) + + if as_index: + # When as_index=True, the result is a Series with a MultiIndex index. + result = self._value_counts_groupby( + by=subset_list, + # Use sort=False to preserve the original order + sort=False, + normalize=normalize, + ascending=False, + dropna=dropna, + normalize_within_groups=by, + ) + else: + # When as_index=False, the result is a DataFrame where count/proportion is appended as a new named column. + result = self._value_counts_groupby( + by=subset_list, + # Use sort=False to preserve the original order + sort=False, + normalize=normalize, + ascending=False, + dropna=dropna, + normalize_within_groups=by, + ).reset_index() + result = result.set_columns( + result._modin_frame.data_column_pandas_labels[:-1] + + ["proportion" if normalize else "count"] + ) + # pandas currently provides the following behaviors based on the different sort flags. + # These behaviors are not entirely consistent with documentation; see this issue for discussion: + # https://github.com/pandas-dev/pandas/issues/59307 + # + # Example data (using pandas 2.2.1 behavior): + # >>> df = pd.DataFrame({"X": ["B", "A", "A", "B", "B", "B"], "Y": [4, 1, 3, -2, -1, -1]}) + # + # 1. groupby(sort=True).value_counts(sort=True) + # Sort on non-grouping columns, then sort on frequencies, then sort on grouping columns. + # >>> df.groupby("X", sort=True).value_counts(sort=True) + # X Y + # A 1 1 + # 3 1 + # B -1 2 + # -2 1 + # 4 1 + # Name: count, dtype: int64 + # + # 2. groupby(sort=True).value_counts(sort=False) + # Sort on non-grouping columns, then sort on grouping columns. + # >>> df.groupby("X", sort=True).value_counts(sort=True) + # X Y + # X Y + # A 1 1 + # 3 1 + # B -2 1 + # -1 2 + # 4 1 + # Name: count, dtype: int64 + # + # 3. groupby(sort=False).value_counts(sort=True) + # Sort on frequencies. + # >>> df.groupby("X", sort=False).value_counts(sort=True) + # X Y + # B -1 2 + # 4 1 + # A 1 1 + # 3 1 + # B -2 1 + # Name: count, dtype: int64 + # + # 4. groupby(sort=False).value_counts(sort=False) + # Sort on nothing (entries match the order of the original frame). + # X Y + # B 4 1 + # A 1 1 + # 3 1 + # B -2 1 + # -1 2 + # Name: count, dtype: int64 + # + # Lastly, when `normalize` is set with groupby(sort=False).value_counts(sort=True, normalize=True), + # pandas will sort by the pre-normalization counts rather than the resulting proportions. As this + # is an uncommon edge case, we cannot handle this using existing QC methods efficiently, so we just + # update our testing code to account for this. + # See comment on issue: https://github.com/pandas-dev/pandas/issues/59307#issuecomment-2313767856 + sort_cols = [] + if groupby_sort: + # When groupby(sort=True), sort the result on the grouping columns + sort_cols = by + ascending_cols = [True] * len(sort_cols) + if sort: + # When sort=True, also sort on the count/proportion column (always the last) + sort_cols.append( + result._modin_frame.data_column_pandas_labels[-1], + ) + ascending_cols.append(ascending) + if groupby_sort: + # When groupby_sort=True, also sort by the non-grouping columns before sorting by + # the count/proportion column. The left-most column (nearest to the grouping columns + # is sorted on last). + # Exclude the grouping columns (always the first) from the sort. + if as_index: + # When as_index is true, the non-grouping columns are part of the index columns + columns_to_filter = result._modin_frame.index_column_pandas_labels + else: + # When as_index is false, the non-grouping columns are part of the data columns + columns_to_filter = result._modin_frame.data_column_pandas_labels + non_grouping_cols = [ + col_label for col_label in columns_to_filter if col_label not in by + ] + sort_cols.extend(non_grouping_cols) + ascending_cols.extend([True] * len(non_grouping_cols)) + return result.sort_rows_by_column_values( + columns=sort_cols, + ascending=ascending_cols, + kind="stable", + na_position="last", + ignore_index=not as_index, # When as_index=False, take the default positional index + ) + def _get_dummies_helper( self, column: Hashable, @@ -11525,11 +11680,13 @@ def value_counts( def _value_counts_groupby( self, - by: Union[List[Hashable], Tuple[Hashable, ...]], + by: Sequence[Hashable], normalize: bool, sort: bool, ascending: bool, dropna: bool, + *, + normalize_within_groups: Optional[list[str]] = None, ) -> "SnowflakeQueryCompiler": """ Helper method to obtain the frequency or number of unique values @@ -11551,6 +11708,10 @@ def _value_counts_groupby( Sort in ascending order. dropna : bool Don't include counts of NaN. + normalize_within_groups : list[str], optional + If set, the normalize parameter will normalize based on the specified groups + rather than the entire dataset. This parameter is exclusive to the Snowpark pandas + query compiler and is only used internally to implement groupby_value_counts. """ self._raise_not_implemented_error_for_timedelta() @@ -11580,9 +11741,21 @@ def _value_counts_groupby( # they are normalized to percentages as [2/(2+1+1), 1/(2+1+1), 1/(2+1+1)] = [0.5, 0.25, 0.25] # by default, ratio_to_report returns a decimal column, whereas pandas returns a float column if normalize: + if normalize_within_groups: + # If normalize_within_groups is set, then the denominator for ratio_to_report should + # be the size of each group instead. + normalize_snowflake_quoted_identifiers = [ + entry[0] + for entry in internal_frame.get_snowflake_quoted_identifiers_group_by_pandas_labels( + normalize_within_groups + ) + ] + window = Window.partition_by(normalize_snowflake_quoted_identifiers) + else: + window = None internal_frame = query_compiler._modin_frame.project_columns( [COUNT_LABEL], - builtin("ratio_to_report")(col(count_identifier)).over(), + builtin("ratio_to_report")(col(count_identifier)).over(window), ) count_identifier = internal_frame.data_column_snowflake_quoted_identifiers[ 0 diff --git a/src/snowflake/snowpark/modin/plugin/docstrings/groupby.py b/src/snowflake/snowpark/modin/plugin/docstrings/groupby.py index 05d29f64850..0692647b3f7 100644 --- a/src/snowflake/snowpark/modin/plugin/docstrings/groupby.py +++ b/src/snowflake/snowpark/modin/plugin/docstrings/groupby.py @@ -203,7 +203,108 @@ def sem(): pass def value_counts(): - pass + """ + Return a Series or DataFrame containing counts of unique rows. + + Parameters + ---------- + subset : list-like, optional + Columns to use when counting unique combinations. + + normalize : bool, default False + Return proportions rather than frequencies. + + Note that when `normalize=True`, `groupby` is called with `sort=False`, and `value_counts` + is called with `sort=True`, Snowpark pandas will order results differently from + native pandas. This occurs because native pandas sorts on frequencies before converting + them to proportions, while Snowpark pandas computes proportions within groups before sorting. + + See issue for details: https://github.com/pandas-dev/pandas/issues/59307 + + sort : bool, default True + Sort by frequencies. + + ascending : bool, default False + Sort in ascending order. + + dropna : bool, default True + Don't include counts of rows that contain NA values. + + Returns + ------- + :class:`~snowflake.snowpark.modin.pandas.Series` or :class:`~snowflake.snowpark.modin.pandas.DataFrame` + Series if the groupby as_index is True, otherwise DataFrame. + + Notes + ----- + - If the groupby as_index is True then the returned Series will have a MultiIndex with one level per input column. + - If the groupby as_index is False then the returned DataFrame will have an additional column with the value_counts. + The column is labelled 'count' or 'proportion', depending on the normalize parameter. + + By default, rows that contain any NA values are omitted from the result. + + By default, the result will be in descending order so that the first element of each group is the most frequently-occurring row. + + Examples + -------- + >>> df = pd.DataFrame({ + ... 'gender': ['male', 'male', 'female', 'male', 'female', 'male'], + ... 'education': ['low', 'medium', 'high', 'low', 'high', 'low'], + ... 'country': ['US', 'FR', 'US', 'FR', 'FR', 'FR'] + ... }) + + >>> df # doctest: +NORMALIZE_WHITESPACE + gender education country + 0 male low US + 1 male medium FR + 2 female high US + 3 male low FR + 4 female high FR + 5 male low FR + + >>> df.groupby('gender').value_counts() # doctest: +NORMALIZE_WHITESPACE + gender education country + female high FR 1 + US 1 + male low FR 2 + US 1 + medium FR 1 + Name: count, dtype: int64 + + >>> df.groupby('gender').value_counts(ascending=True) # doctest: +NORMALIZE_WHITESPACE + gender education country + female high FR 1 + US 1 + male low US 1 + medium FR 1 + low FR 2 + Name: count, dtype: int64 + + >>> df.groupby('gender').value_counts(normalize=True) # doctest: +NORMALIZE_WHITESPACE + gender education country + female high FR 0.50 + US 0.50 + male low FR 0.50 + US 0.25 + medium FR 0.25 + Name: proportion, dtype: float64 + + >>> df.groupby('gender', as_index=False).value_counts() # doctest: +NORMALIZE_WHITESPACE + gender education country count + 0 female high FR 1 + 1 female high US 1 + 2 male low FR 2 + 3 male low US 1 + 4 male medium FR 1 + + >>> df.groupby('gender', as_index=False).value_counts(normalize=True) # doctest: +NORMALIZE_WHITESPACE + gender education country proportion + 0 female high FR 0.50 + 1 female high US 0.50 + 2 male low FR 0.50 + 3 male low US 0.25 + 4 male medium FR 0.25 + """ def mean(): """ @@ -2103,8 +2204,45 @@ def size(): """ pass - def unique(self): + def unique(): pass def apply(): pass + + def value_counts(): + """ + Return a Series or DataFrame containing counts of unique rows. + + Parameters + ---------- + subset : list-like, optional + Columns to use when counting unique combinations. + + normalize : bool, default False + Return proportions rather than frequencies. + + Note that when `normalize=True`, `groupby` is called with `sort=False`, and `value_counts` + is called with `sort=True`, Snowpark pandas will order results differently from + native pandas. This occurs because native pandas sorts on frequencies before converting + them to proportions, while Snowpark pandas computes proportions within groups before sorting. + + See issue for details: https://github.com/pandas-dev/pandas/issues/59307 + + sort : bool, default True + Sort by frequencies. + + ascending : bool, default False + Sort in ascending order. + + bins : int, optional + Rather than count values, group them into half-open bins, a convenience for `pd.cut`, only works with numeric data. + This parameter is not yet supported in Snowpark pandas. + + dropna : bool, default True + Don't include counts of rows that contain NA values. + + Returns + ------- + :class:`~snowflake.snowpark.modin.pandas.Series` + """ diff --git a/tests/integ/modin/groupby/test_value_counts.py b/tests/integ/modin/groupby/test_value_counts.py new file mode 100644 index 00000000000..1f1b2f5c052 --- /dev/null +++ b/tests/integ/modin/groupby/test_value_counts.py @@ -0,0 +1,194 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# + +import modin.pandas as pd +import pandas as native_pd +import pytest + +import snowflake.snowpark.modin.plugin # noqa: F401 +from tests.integ.modin.sql_counter import SqlCounter, sql_count_checker +from tests.integ.modin.utils import ( + assert_snowpark_pandas_equal_to_pandas, + create_test_dfs, + eval_snowpark_pandas_result, +) + +TEST_DATA = [ + { + "by": ["c", "b", "a", "a", "b", "b", "c", "a"], + "value1": ["ee", "aa", "bb", "aa", "bb", "cc", "dd", "aa"], + "value2": [1, 2, 3, 1, 1, 3, 2, 1], + }, + { + "by": ["key 1", None, None, "key 1", "key 2", "key 1"], + "value1": [None, "value", None, None, None, "value"], + "value2": ["value", None, None, None, "value", None], + }, + # Copied from pandas docs + { + "by": ["male", "male", "female", "male", "female", "male"], + "value1": ["low", "medium", "high", "low", "high", "low"], + "value2": ["US", "FR", "US", "FR", "FR", "FR"], + }, +] + + +@pytest.mark.parametrize("test_data", TEST_DATA) +@pytest.mark.parametrize("by", ["by", ["value1", "by"], ["by", "value2"]]) +@pytest.mark.parametrize("groupby_sort", [True, False]) +@pytest.mark.parametrize("sort", [True, False]) +@pytest.mark.parametrize("ascending", [True, False]) +@pytest.mark.parametrize( + "subset", + [None, ["value1"], ["value2"], ["value1", "value2"]], +) +@pytest.mark.parametrize("dropna", [True, False]) +def test_value_counts_basic( + test_data, by, groupby_sort, sort, ascending, subset, dropna +): + by_list = by if isinstance(by, list) else [by] + value_counts_kwargs = { + "sort": sort, + "ascending": ascending, + "subset": subset, + "dropna": dropna, + } + if len(set(by_list) & set(subset or [])): + # If subset and by overlap, check for ValueError + # Unlike pandas, we do not surface label names in the error message + with SqlCounter(query_count=0): + eval_snowpark_pandas_result( + *create_test_dfs(test_data), + lambda df: df.groupby(by=by, sort=groupby_sort).value_counts( + **value_counts_kwargs + ), + expect_exception=True, + expect_exception_type=ValueError, + expect_exception_match="in subset cannot be in the groupby column keys", + assert_exception_equal=False, + ) + return + with SqlCounter(query_count=1): + none_in_by_col = any(None in test_data[col] for col in by_list) + if not dropna and none_in_by_col: + # when dropna is False, pandas gives a different result because it drops all NaN + # keys in the multiindex + # https://github.com/pandas-dev/pandas/issues/56366 + # as a workaround, replace all Nones in the pandas frame with a sentinel value + # since NaNs are sorted last, we want the sentinel to sort to the end as well + VALUE_COUNTS_TEST_SENTINEL = "zzzzzz" + snow_df, native_df = create_test_dfs(test_data) + snow_result = snow_df.groupby(by=by, sort=groupby_sort).value_counts( + **value_counts_kwargs + ) + native_df = native_df.fillna(value=VALUE_COUNTS_TEST_SENTINEL) + native_result = native_df.groupby(by=by, sort=groupby_sort).value_counts( + **value_counts_kwargs + ) + native_result.index = native_result.index.map( + lambda x: tuple( + None if i == VALUE_COUNTS_TEST_SENTINEL else i for i in x + ) + ) + assert_snowpark_pandas_equal_to_pandas(snow_result, native_result) + else: + eval_snowpark_pandas_result( + *create_test_dfs(test_data), + lambda df: df.groupby(by=by, sort=groupby_sort).value_counts( + **value_counts_kwargs + ), + ) + + +@pytest.mark.parametrize("test_data", TEST_DATA) +@pytest.mark.parametrize("by", ["by", ["value1", "by"], ["by", "value2"]]) +@pytest.mark.parametrize("groupby_sort", [True, False]) +@pytest.mark.parametrize("sort", [True, False]) +@pytest.mark.parametrize("ascending", [True, False]) +@pytest.mark.parametrize("normalize", [True, False]) +@sql_count_checker(query_count=1) +def test_value_counts_normalize( + test_data, by, groupby_sort, sort, ascending, normalize +): + value_counts_kwargs = { + "sort": sort, + "ascending": ascending, + "normalize": normalize, + } + # When normalize is set, pandas will (counter-intuitively) sort by the pre-normalization + # counts rather than the result proportions. This only matters if groupby_sort is False + # and sort is True. + # We work around this by using check_like=True + # See https://github.com/pandas-dev/pandas/issues/59307#issuecomment-2313767856 + check_like = not groupby_sort and sort and normalize + eval_snowpark_pandas_result( + *create_test_dfs(test_data), + lambda df: df.groupby(by=by, sort=groupby_sort).value_counts( + **value_counts_kwargs + ), + check_like=check_like, + ) + + +@pytest.mark.parametrize("test_data", TEST_DATA) +@pytest.mark.parametrize("by", ["by", ["value1", "by"], ["by", "value2"]]) +@pytest.mark.parametrize("groupby_sort", [True, False]) +@pytest.mark.parametrize("sort", [True, False]) +@pytest.mark.parametrize("as_index", [True, False]) +@sql_count_checker(query_count=1) +def test_value_counts_as_index(test_data, by, groupby_sort, sort, as_index): + eval_snowpark_pandas_result( + *create_test_dfs(test_data), + lambda df: df.groupby(by=by, sort=groupby_sort, as_index=as_index).value_counts( + sort=sort + ), + ) + + +@pytest.mark.parametrize( + "subset, exception_cls", + [ + (["bad_key"], KeyError), # key not in frame + (["by"], ValueError), # subset cannot overlap with grouping columns + (["by", "bad_key"], ValueError), # subset cannot overlap with grouping columns + ], +) +def test_value_counts_bad_subset(subset, exception_cls): + # for KeyError, 1 query always runs to validate the length of the by list + with SqlCounter(query_count=1 if exception_cls is KeyError else 0): + eval_snowpark_pandas_result( + *create_test_dfs(TEST_DATA[0]), + lambda x: x.groupby(by=["by"]).value_counts(subset=subset), + expect_exception=True, + expect_exception_type=exception_cls, + assert_exception_equal=False, + ) + + +# An additional query is needed to validate the length of the by list +# A JOIN is needed to set the index to the by list +@sql_count_checker(query_count=2, join_count=1) +def test_value_counts_series(): + by = ["a", "a", "b", "b", "a", "c"] + native_ser = native_pd.Series( + [0, 0, None, 1, None, 3], + ) + snow_ser = pd.Series(native_ser) + eval_snowpark_pandas_result( + snow_ser, native_ser, lambda ser: ser.groupby(by=by).value_counts() + ) + + +# 1 query always runs to validate the length of the by list +@sql_count_checker(query_count=1) +def test_value_counts_bins_unimplemented(): + by = ["a", "a", "b", "b", "a", "c"] + native_ser = native_pd.Series( + [0, 0, None, 1, None, 3], + ) + snow_ser = pd.Series(native_ser) + with pytest.raises(NotImplementedError): + eval_snowpark_pandas_result( + snow_ser, native_ser, lambda ser: ser.groupby(by=by).value_counts(bins=3) + ) diff --git a/tests/unit/modin/test_groupby_unsupported.py b/tests/unit/modin/test_groupby_unsupported.py index efc48724055..6bb27db446f 100644 --- a/tests/unit/modin/test_groupby_unsupported.py +++ b/tests/unit/modin/test_groupby_unsupported.py @@ -39,7 +39,6 @@ (lambda se: se.groupby("A").skew(), "skew"), (lambda se: se.groupby("A").take(2), "take"), (lambda se: se.groupby("A").expanding(), "expanding"), - (lambda se: se.groupby("A").value_counts(), "value_counts"), (lambda se: se.groupby("A").hist(), "hist"), (lambda se: se.groupby("A").plot(), "plot"), (lambda se: se.groupby("A").boxplot("test_group"), "boxplot"), @@ -83,7 +82,6 @@ def test_series_groupby_unsupported_methods_raises( (lambda df: df.groupby("A").skew(), "skew"), (lambda df: df.groupby("A").take(2), "take"), (lambda df: df.groupby("A").expanding(), "expanding"), - (lambda df: df.groupby("A").value_counts(), "value_counts"), (lambda df: df.groupby("A").hist(), "hist"), (lambda df: df.groupby("A").plot(), "plot"), (lambda df: df.groupby("A").boxplot("test_group"), "boxplot"),