Skip to content

Commit

Permalink
Merge branch 'main' into joshi-SNOW-1489371-groupby-value_count
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-joshi authored Jul 26, 2024
2 parents ae2690c + c88209f commit 52086f6
Show file tree
Hide file tree
Showing 10 changed files with 375 additions and 38 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#### Improvements
- Added support server side string size limitations.
- Added support for passing `INFER_SCHEMA` options to `DataFrameReader` via `INFER_SCHEMA_OPTIONS`.

#### Bug Fixes
- Fixed a bug where SQL generated for selecting `*` column has an incorrect subquery.
Expand All @@ -30,6 +31,7 @@
- Added support for `Series.dt.microsecond` and `Series.dt.nanosecond`.
- Added support for `Index.is_unique` and `Index.has_duplicates`.
- Added support for `Index.equals`.
- Added support for `Index.value_counts`.
- Added support for `DataFrameGroupBy.value_counts` and `SeriesGroupBy.value_counts`.

#### Improvements
Expand All @@ -40,6 +42,7 @@
- Made passing an unsupported aggregation function to `pivot_table` raise `NotImplementedError` instead of `KeyError`.
- Removed axis labels and callable names from error messages and telemetry about unsupported aggregations.
- Fixed AssertionError in `Series.drop_duplicates` and `DataFrame.drop_duplicates` when called after `sort_values`.
- Fixed a bug in `Index.to_frame` where the result frame's column name may be wrong where name is unspecified.


## 1.20.0 (2024-07-17)
Expand Down
5 changes: 4 additions & 1 deletion docs/source/modin/supported/index_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ The following table is structured as follows: The first column contains the meth
The second column is a flag for whether or not there is an implementation in Snowpark for
the method in the left column.

Currently, there is no lazy MultiIndex support. This lazy Index object is only a single Index object.
However, existing Snowpark pandas DataFrame and Series APIs may support native pandas MultiIndex objects.

.. note::
``Y`` stands for yes, i.e., supports distributed implementation, ``N`` stands for no and API simply errors out,
``P`` stands for partial (meaning some parameters may not be supported yet), and ``D`` stands for defaults to single
Expand Down Expand Up @@ -124,7 +127,7 @@ Methods
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``nunique`` | Y | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``value_counts`` | N | | |
| ``value_counts`` | P | ``bins`` | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``set_names`` | N | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down
9 changes: 8 additions & 1 deletion src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,9 @@ def create_file_format_statement(
)


def infer_schema_statement(path: str, file_format_name: str) -> str:
def infer_schema_statement(
path: str, file_format_name: str, options: Optional[Dict[str, str]] = None
) -> str:
return (
SELECT
+ STAR
Expand All @@ -913,6 +915,11 @@ def infer_schema_statement(path: str, file_format_name: str) -> str:
+ SINGLE_QUOTE
+ file_format_name
+ SINGLE_QUOTE
+ (
", " + ", ".join(f"{k} => {v}" for k, v in options.items())
if options
else ""
)
+ RIGHT_PARENTHESIS
+ RIGHT_PARENTHESIS
)
Expand Down
1 change: 1 addition & 0 deletions src/snowflake/snowpark/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@
"FILES",
# The following are not copy into SQL command options but client side options.
"INFER_SCHEMA",
"INFER_SCHEMA_OPTIONS",
"FORMAT_TYPE_OPTIONS",
"TARGET_COLUMNS",
"TRANSFORMATIONS",
Expand Down
5 changes: 4 additions & 1 deletion src/snowflake/snowpark/dataframe_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,10 @@ def _infer_schema_for_file_format(
drop_tmp_file_format_if_exists_query: Optional[str] = None
use_temp_file_format = "FORMAT_NAME" not in self._cur_options
file_format_name = self._cur_options.get("FORMAT_NAME", temp_file_format_name)
infer_schema_query = infer_schema_statement(path, file_format_name)
infer_schema_options = self._cur_options.get("INFER_SCHEMA_OPTIONS", None)
infer_schema_query = infer_schema_statement(
path, file_format_name, infer_schema_options
)
try:
if use_temp_file_format:
self._session._conn.run_query(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import uuid
from collections.abc import Hashable, Iterable, Mapping, Sequence
from datetime import timedelta, tzinfo
from typing import Any, Callable, List, Literal, Optional, Union, get_args
from typing import Any, Callable, List, Literal, Optional, Tuple, Union, get_args

import numpy as np
import numpy.typing as npt
Expand Down Expand Up @@ -10744,6 +10744,45 @@ def resample(

return SnowflakeQueryCompiler(frame)

def value_counts_index(
self,
normalize: bool = False,
sort: bool = True,
ascending: bool = False,
bins: Optional[int] = None,
dropna: bool = True,
) -> "SnowflakeQueryCompiler":
"""
Counts the frequency or number of unique values of Index SnowflakeQueryCompiler.

The resulting object will be in descending order so that the
first element is the most frequently occurring element.
Excludes NA values by default.

Args:
normalize : bool, default False
If True then the object returned will contain the relative
frequencies of the unique values.
sort : bool, default True
Sort by frequencies when True. Preserve the order of the data when False.
ascending : bool, default False
Sort in ascending order.
bins : int, optional
Rather than count values, group them into half-open bins,
a convenience for ``pd.cut``, only works with numeric data.
This argument is not supported yet.
dropna : bool, default True
Don't include counts of NaN.
"""
if bins is not None:
raise ErrorMessage.not_implemented("bins argument is not yet supported")

assert (
not self.is_multiindex()
), "value_counts_index only supports single index objects"
by = self._modin_frame.index_column_pandas_labels
return self._value_counts_groupby(by, normalize, sort, ascending, dropna)

def value_counts(
self,
subset: Optional[Sequence[Hashable]] = None,
Expand All @@ -10756,10 +10795,10 @@ def value_counts(
normalize_within_groups: Optional[list[str]] = None,
) -> "SnowflakeQueryCompiler":
"""
Counts the number of unique values (frequency) of SnowflakeQueryCompiler.
Counts the frequency or number of unique values of SnowflakeQueryCompiler.

The resulting object will be in descending order so that the
first element is the most frequently-occurring element.
first element is the most frequently occurring element.
Excludes NA values by default.

Args:
Expand Down Expand Up @@ -10794,6 +10833,37 @@ def value_counts(
else:
by = self._modin_frame.data_column_pandas_labels

return self._value_counts_groupby(by, normalize, sort, ascending, dropna)

def _value_counts_groupby(
self,
by: Union[List[Hashable], Tuple[Hashable, ...]],
normalize: bool,
sort: bool,
ascending: bool,
dropna: bool,
) -> "SnowflakeQueryCompiler":
"""
Helper method to obtain the frequency or number of unique values
within a group.

The resulting object will be in descending order so that the
first element is the most frequently occurring element.
Excludes NA values by default.

Args:
by : list
Columns to perform value_counts on.
normalize : bool
If True then the object returned will contain the relative
frequencies of the unique values.
sort : bool
Sort by frequencies when True. Preserve the order of the data when False.
ascending : bool
Sort in ascending order.
dropna : bool
Don't include counts of NaN.
"""
# validate whether by is valid (e.g., contains duplicates or non-existing labels)
self.validate_groupby(by=by, axis=0, level=None)

Expand Down
77 changes: 56 additions & 21 deletions src/snowflake/snowpark/modin/plugin/extensions/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import numpy as np
import pandas as native_pd
from pandas._libs import lib
from pandas._typing import ArrayLike, DtypeObj, NaPosition
from pandas.core.arrays import ExtensionArray
from pandas.core.dtypes.base import ExtensionDtype
Expand Down Expand Up @@ -1450,21 +1451,20 @@ def value_counts(
normalize: bool = False,
sort: bool = True,
ascending: bool = False,
bins: Any = None,
bins: int | None = None,
dropna: bool = True,
) -> native_pd.Series:
# how to change the above return type to modin pandas series?
) -> Series:
"""
Return a Series containing counts of unique values.
The resulting object will be in descending order so that the
first element is the most frequently-occurring element.
first element is the most frequently occurring element.
Excludes NA values by default.
Parameters
----------
normalize : bool, default False
If True then the object returned will contain the relative
If True, then the object returned will contain the relative
frequencies of the unique values.
sort : bool, default True
Sort by frequencies when True. Preserve the order of the data when False.
Expand All @@ -1473,13 +1473,14 @@ def value_counts(
bins : int, optional
Rather than count values, group them into half-open bins,
a convenience for ``pd.cut``, only works with numeric data.
`bins` is not yet supported.
dropna : bool, default True
Don't include counts of NaN.
Returns
-------
Series
A series containing counts of unique values.
A Series containing counts of unique values.
See Also
--------
Expand Down Expand Up @@ -1515,14 +1516,15 @@ def value_counts(
apparitions of values, divide the index in the specified
number of half-open bins.
"""
# TODO: SNOW-1458133 implement value_counts
WarningMessage.index_to_pandas_warning("value_counts")
return self.to_pandas().value_counts(
normalize=normalize,
sort=sort,
ascending=ascending,
bins=bins,
dropna=dropna,
return Series(
query_compiler=self._query_compiler.value_counts_index(
normalize=normalize,
sort=sort,
ascending=ascending,
bins=bins,
dropna=dropna,
).set_index_names([self.name]),
name="proportion" if normalize else "count",
)

@is_lazy_check
Expand Down Expand Up @@ -1597,7 +1599,9 @@ def to_series(
return ser

@is_lazy_check
def to_frame(self, index: bool = True, name: Hashable | None = None) -> DataFrame:
def to_frame(
self, index: bool = True, name: Hashable | None = lib.no_default
) -> DataFrame:
"""
Create a DataFrame with a column containing the Index.
Expand All @@ -1619,6 +1623,32 @@ def to_frame(self, index: bool = True, name: Hashable | None = None) -> DataFram
--------
Index.to_series : Convert an Index to a Series.
Series.to_frame : Convert Series to DataFrame.
Examples
--------
>>> idx = pd.Index(['Ant', 'Bear', 'Cow'], name='animal')
>>> idx.to_frame()
animal
animal
Ant Ant
Bear Bear
Cow Cow
By default, the original Index is reused. To enforce a new Index:
>>> idx.to_frame(index=False)
animal
0 Ant
1 Bear
2 Cow
To override the name of the resulting column, specify `name`:
>>> idx.to_frame(index=False, name='zoo')
zoo
0 Ant
1 Bear
2 Cow
"""
# Do a reset index to convert the index column to a data column,
# the index column becomes the pandas default index of row position
Expand All @@ -1633,17 +1663,22 @@ def to_frame(self, index: bool = True, name: Hashable | None = None) -> DataFram
# 0 100
# 1 200
# 2 300
new_qc = self._query_compiler.reset_index()
# if index is true, we want self to be in the index and data columns of the df,
# so set the index as the data column and set the name of the index
if index:
new_qc = self._query_compiler.reset_index()
new_qc = (
new_qc.set_index([new_qc.columns[0]], drop=False)
.set_columns([name])
.set_index_names([self.name])
new_qc = new_qc.set_index([new_qc.columns[0]], drop=False).set_index_names(
[self.name]
)
# If `name` is specified, use it as new column name; otherwise, set new column name to the original index name.
# Note there is one exception case: when the original index name is None, the new column name should be 0.
if name != lib.no_default:
new_col_name = name
else:
new_qc = self._query_compiler.reset_index(names=[name])
new_col_name = self.name
if new_col_name is None:
new_col_name = 0
new_qc = new_qc.set_columns([new_col_name])

return DataFrame(query_compiler=new_qc)

Expand Down
16 changes: 5 additions & 11 deletions tests/integ/modin/index/test_index_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import modin.pandas as pd
import pytest
from numpy.testing import assert_equal
from pandas._libs import lib

import snowflake.snowpark.modin.plugin # noqa: F401
from tests.integ.modin.index.conftest import (
Expand Down Expand Up @@ -66,13 +67,6 @@ def test_df_index_equals(native_df):
assert snow_df.index.equals(native_df.index)


@sql_count_checker(query_count=1)
@pytest.mark.parametrize("native_index", NATIVE_INDEX_TEST_DATA)
def test_index_value_counts(native_index):
snow_index = pd.Index(native_index)
assert_series_equal(snow_index.value_counts(), native_index.value_counts())


@sql_count_checker(query_count=8)
def test_index_union():
idx1 = pd.Index([1, 2, 3, 4])
Expand Down Expand Up @@ -239,7 +233,7 @@ def test_df_index_columns_to_list(native_df):
assert_equal(native_df.columns.to_list(), snow_df.columns.to_list())


@pytest.mark.parametrize("name", [None, "name", True, 1])
@pytest.mark.parametrize("name", [None, "name", True, 1, lib.no_default])
@pytest.mark.parametrize("generate_extra_index", [True, False])
@pytest.mark.parametrize("native_index", NATIVE_INDEX_TEST_DATA)
def test_index_to_series(native_index, generate_extra_index, name):
Expand All @@ -256,7 +250,7 @@ def test_index_to_series(native_index, generate_extra_index, name):
)


@pytest.mark.parametrize("name", [None, "name", True, 1])
@pytest.mark.parametrize("name", [None, "name", True, 1, lib.no_default])
@pytest.mark.parametrize("generate_extra_index", [True, False])
@pytest.mark.parametrize("native_df", TEST_DFS)
def test_df_index_columns_to_series(native_df, generate_extra_index, name):
Expand Down Expand Up @@ -291,7 +285,7 @@ def test_df_index_columns_to_series(native_df, generate_extra_index, name):


@sql_count_checker(query_count=1)
@pytest.mark.parametrize("name", [None, "name", True, 1])
@pytest.mark.parametrize("name", [None, "name", True, 1, lib.no_default])
@pytest.mark.parametrize("index", [True, False])
@pytest.mark.parametrize("native_index", NATIVE_INDEX_TEST_DATA)
def test_index_to_frame(native_index, name, index):
Expand All @@ -305,7 +299,7 @@ def test_index_to_frame(native_index, name, index):


@sql_count_checker(query_count=2)
@pytest.mark.parametrize("name", [None, "name", True, 1])
@pytest.mark.parametrize("name", [None, "name", True, 1, lib.no_default])
@pytest.mark.parametrize("index", [True, False])
@pytest.mark.parametrize("native_df", TEST_DFS)
def test_df_index_columns_to_frame(native_df, index, name):
Expand Down
Loading

0 comments on commit 52086f6

Please sign in to comment.