diff --git a/CHANGELOG.md b/CHANGELOG.md index 87bb88b77d5..8437923ba2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ #### Improvements - Added support server side string size limitations. +- Added support for passing `INFER_SCHEMA` options to `DataFrameReader` via `INFER_SCHEMA_OPTIONS`. #### Bug Fixes - Fixed a bug where SQL generated for selecting `*` column has an incorrect subquery. @@ -30,6 +31,7 @@ - Added support for `Series.dt.microsecond` and `Series.dt.nanosecond`. - Added support for `Index.is_unique` and `Index.has_duplicates`. - Added support for `Index.equals`. +- Added support for `Index.value_counts`. - Added support for `DataFrameGroupBy.value_counts` and `SeriesGroupBy.value_counts`. #### Improvements @@ -40,6 +42,7 @@ - Made passing an unsupported aggregation function to `pivot_table` raise `NotImplementedError` instead of `KeyError`. - Removed axis labels and callable names from error messages and telemetry about unsupported aggregations. - Fixed AssertionError in `Series.drop_duplicates` and `DataFrame.drop_duplicates` when called after `sort_values`. +- Fixed a bug in `Index.to_frame` where the result frame's column name may be wrong where name is unspecified. ## 1.20.0 (2024-07-17) diff --git a/docs/source/modin/supported/index_supported.rst b/docs/source/modin/supported/index_supported.rst index 86cd4184ce6..eb498ed97a3 100644 --- a/docs/source/modin/supported/index_supported.rst +++ b/docs/source/modin/supported/index_supported.rst @@ -5,6 +5,9 @@ The following table is structured as follows: The first column contains the meth The second column is a flag for whether or not there is an implementation in Snowpark for the method in the left column. +Currently, there is no lazy MultiIndex support. This lazy Index object is only a single Index object. +However, existing Snowpark pandas DataFrame and Series APIs may support native pandas MultiIndex objects. + .. note:: ``Y`` stands for yes, i.e., supports distributed implementation, ``N`` stands for no and API simply errors out, ``P`` stands for partial (meaning some parameters may not be supported yet), and ``D`` stands for defaults to single @@ -124,7 +127,7 @@ Methods +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``nunique`` | Y | | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ -| ``value_counts`` | N | | | +| ``value_counts`` | P | ``bins`` | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``set_names`` | N | | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ diff --git a/src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py b/src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py index 4a90de59ab9..363221b97c7 100644 --- a/src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py +++ b/src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py @@ -895,7 +895,9 @@ def create_file_format_statement( ) -def infer_schema_statement(path: str, file_format_name: str) -> str: +def infer_schema_statement( + path: str, file_format_name: str, options: Optional[Dict[str, str]] = None +) -> str: return ( SELECT + STAR @@ -913,6 +915,11 @@ def infer_schema_statement(path: str, file_format_name: str) -> str: + SINGLE_QUOTE + file_format_name + SINGLE_QUOTE + + ( + ", " + ", ".join(f"{k} => {v}" for k, v in options.items()) + if options + else "" + ) + RIGHT_PARENTHESIS + RIGHT_PARENTHESIS ) diff --git a/src/snowflake/snowpark/_internal/utils.py b/src/snowflake/snowpark/_internal/utils.py index 9ddf53da927..184b0a81446 100644 --- a/src/snowflake/snowpark/_internal/utils.py +++ b/src/snowflake/snowpark/_internal/utils.py @@ -169,6 +169,7 @@ "FILES", # The following are not copy into SQL command options but client side options. "INFER_SCHEMA", + "INFER_SCHEMA_OPTIONS", "FORMAT_TYPE_OPTIONS", "TARGET_COLUMNS", "TRANSFORMATIONS", diff --git a/src/snowflake/snowpark/dataframe_reader.py b/src/snowflake/snowpark/dataframe_reader.py index 5cf4b0c388e..a3203e278fc 100644 --- a/src/snowflake/snowpark/dataframe_reader.py +++ b/src/snowflake/snowpark/dataframe_reader.py @@ -596,7 +596,10 @@ def _infer_schema_for_file_format( drop_tmp_file_format_if_exists_query: Optional[str] = None use_temp_file_format = "FORMAT_NAME" not in self._cur_options file_format_name = self._cur_options.get("FORMAT_NAME", temp_file_format_name) - infer_schema_query = infer_schema_statement(path, file_format_name) + infer_schema_options = self._cur_options.get("INFER_SCHEMA_OPTIONS", None) + infer_schema_query = infer_schema_statement( + path, file_format_name, infer_schema_options + ) try: if use_temp_file_format: self._session._conn.run_query( diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index ac5e2a2e984..6679415aace 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -10,7 +10,7 @@ import uuid from collections.abc import Hashable, Iterable, Mapping, Sequence from datetime import timedelta, tzinfo -from typing import Any, Callable, List, Literal, Optional, Union, get_args +from typing import Any, Callable, List, Literal, Optional, Tuple, Union, get_args import numpy as np import numpy.typing as npt @@ -10744,6 +10744,45 @@ def resample( return SnowflakeQueryCompiler(frame) + def value_counts_index( + self, + normalize: bool = False, + sort: bool = True, + ascending: bool = False, + bins: Optional[int] = None, + dropna: bool = True, + ) -> "SnowflakeQueryCompiler": + """ + Counts the frequency or number of unique values of Index SnowflakeQueryCompiler. + + The resulting object will be in descending order so that the + first element is the most frequently occurring element. + Excludes NA values by default. + + Args: + normalize : bool, default False + If True then the object returned will contain the relative + frequencies of the unique values. + sort : bool, default True + Sort by frequencies when True. Preserve the order of the data when False. + ascending : bool, default False + Sort in ascending order. + bins : int, optional + Rather than count values, group them into half-open bins, + a convenience for ``pd.cut``, only works with numeric data. + This argument is not supported yet. + dropna : bool, default True + Don't include counts of NaN. + """ + if bins is not None: + raise ErrorMessage.not_implemented("bins argument is not yet supported") + + assert ( + not self.is_multiindex() + ), "value_counts_index only supports single index objects" + by = self._modin_frame.index_column_pandas_labels + return self._value_counts_groupby(by, normalize, sort, ascending, dropna) + def value_counts( self, subset: Optional[Sequence[Hashable]] = None, @@ -10756,10 +10795,10 @@ def value_counts( normalize_within_groups: Optional[list[str]] = None, ) -> "SnowflakeQueryCompiler": """ - Counts the number of unique values (frequency) of SnowflakeQueryCompiler. + Counts the frequency or number of unique values of SnowflakeQueryCompiler. The resulting object will be in descending order so that the - first element is the most frequently-occurring element. + first element is the most frequently occurring element. Excludes NA values by default. Args: @@ -10794,6 +10833,37 @@ def value_counts( else: by = self._modin_frame.data_column_pandas_labels + return self._value_counts_groupby(by, normalize, sort, ascending, dropna) + + def _value_counts_groupby( + self, + by: Union[List[Hashable], Tuple[Hashable, ...]], + normalize: bool, + sort: bool, + ascending: bool, + dropna: bool, + ) -> "SnowflakeQueryCompiler": + """ + Helper method to obtain the frequency or number of unique values + within a group. + + The resulting object will be in descending order so that the + first element is the most frequently occurring element. + Excludes NA values by default. + + Args: + by : list + Columns to perform value_counts on. + normalize : bool + If True then the object returned will contain the relative + frequencies of the unique values. + sort : bool + Sort by frequencies when True. Preserve the order of the data when False. + ascending : bool + Sort in ascending order. + dropna : bool + Don't include counts of NaN. + """ # validate whether by is valid (e.g., contains duplicates or non-existing labels) self.validate_groupby(by=by, axis=0, level=None) diff --git a/src/snowflake/snowpark/modin/plugin/extensions/index.py b/src/snowflake/snowpark/modin/plugin/extensions/index.py index 97d16ed5a9a..65da5c6c1f1 100644 --- a/src/snowflake/snowpark/modin/plugin/extensions/index.py +++ b/src/snowflake/snowpark/modin/plugin/extensions/index.py @@ -27,6 +27,7 @@ import numpy as np import pandas as native_pd +from pandas._libs import lib from pandas._typing import ArrayLike, DtypeObj, NaPosition from pandas.core.arrays import ExtensionArray from pandas.core.dtypes.base import ExtensionDtype @@ -1450,21 +1451,20 @@ def value_counts( normalize: bool = False, sort: bool = True, ascending: bool = False, - bins: Any = None, + bins: int | None = None, dropna: bool = True, - ) -> native_pd.Series: - # how to change the above return type to modin pandas series? + ) -> Series: """ Return a Series containing counts of unique values. The resulting object will be in descending order so that the - first element is the most frequently-occurring element. + first element is the most frequently occurring element. Excludes NA values by default. Parameters ---------- normalize : bool, default False - If True then the object returned will contain the relative + If True, then the object returned will contain the relative frequencies of the unique values. sort : bool, default True Sort by frequencies when True. Preserve the order of the data when False. @@ -1473,13 +1473,14 @@ def value_counts( bins : int, optional Rather than count values, group them into half-open bins, a convenience for ``pd.cut``, only works with numeric data. + `bins` is not yet supported. dropna : bool, default True Don't include counts of NaN. Returns ------- Series - A series containing counts of unique values. + A Series containing counts of unique values. See Also -------- @@ -1515,14 +1516,15 @@ def value_counts( apparitions of values, divide the index in the specified number of half-open bins. """ - # TODO: SNOW-1458133 implement value_counts - WarningMessage.index_to_pandas_warning("value_counts") - return self.to_pandas().value_counts( - normalize=normalize, - sort=sort, - ascending=ascending, - bins=bins, - dropna=dropna, + return Series( + query_compiler=self._query_compiler.value_counts_index( + normalize=normalize, + sort=sort, + ascending=ascending, + bins=bins, + dropna=dropna, + ).set_index_names([self.name]), + name="proportion" if normalize else "count", ) @is_lazy_check @@ -1597,7 +1599,9 @@ def to_series( return ser @is_lazy_check - def to_frame(self, index: bool = True, name: Hashable | None = None) -> DataFrame: + def to_frame( + self, index: bool = True, name: Hashable | None = lib.no_default + ) -> DataFrame: """ Create a DataFrame with a column containing the Index. @@ -1619,6 +1623,32 @@ def to_frame(self, index: bool = True, name: Hashable | None = None) -> DataFram -------- Index.to_series : Convert an Index to a Series. Series.to_frame : Convert Series to DataFrame. + + Examples + -------- + >>> idx = pd.Index(['Ant', 'Bear', 'Cow'], name='animal') + >>> idx.to_frame() + animal + animal + Ant Ant + Bear Bear + Cow Cow + + By default, the original Index is reused. To enforce a new Index: + + >>> idx.to_frame(index=False) + animal + 0 Ant + 1 Bear + 2 Cow + + To override the name of the resulting column, specify `name`: + + >>> idx.to_frame(index=False, name='zoo') + zoo + 0 Ant + 1 Bear + 2 Cow """ # Do a reset index to convert the index column to a data column, # the index column becomes the pandas default index of row position @@ -1633,17 +1663,22 @@ def to_frame(self, index: bool = True, name: Hashable | None = None) -> DataFram # 0 100 # 1 200 # 2 300 + new_qc = self._query_compiler.reset_index() # if index is true, we want self to be in the index and data columns of the df, # so set the index as the data column and set the name of the index if index: - new_qc = self._query_compiler.reset_index() - new_qc = ( - new_qc.set_index([new_qc.columns[0]], drop=False) - .set_columns([name]) - .set_index_names([self.name]) + new_qc = new_qc.set_index([new_qc.columns[0]], drop=False).set_index_names( + [self.name] ) + # If `name` is specified, use it as new column name; otherwise, set new column name to the original index name. + # Note there is one exception case: when the original index name is None, the new column name should be 0. + if name != lib.no_default: + new_col_name = name else: - new_qc = self._query_compiler.reset_index(names=[name]) + new_col_name = self.name + if new_col_name is None: + new_col_name = 0 + new_qc = new_qc.set_columns([new_col_name]) return DataFrame(query_compiler=new_qc) diff --git a/tests/integ/modin/index/test_index_methods.py b/tests/integ/modin/index/test_index_methods.py index e350e83ac5d..417089f13df 100644 --- a/tests/integ/modin/index/test_index_methods.py +++ b/tests/integ/modin/index/test_index_methods.py @@ -5,6 +5,7 @@ import modin.pandas as pd import pytest from numpy.testing import assert_equal +from pandas._libs import lib import snowflake.snowpark.modin.plugin # noqa: F401 from tests.integ.modin.index.conftest import ( @@ -66,13 +67,6 @@ def test_df_index_equals(native_df): assert snow_df.index.equals(native_df.index) -@sql_count_checker(query_count=1) -@pytest.mark.parametrize("native_index", NATIVE_INDEX_TEST_DATA) -def test_index_value_counts(native_index): - snow_index = pd.Index(native_index) - assert_series_equal(snow_index.value_counts(), native_index.value_counts()) - - @sql_count_checker(query_count=8) def test_index_union(): idx1 = pd.Index([1, 2, 3, 4]) @@ -239,7 +233,7 @@ def test_df_index_columns_to_list(native_df): assert_equal(native_df.columns.to_list(), snow_df.columns.to_list()) -@pytest.mark.parametrize("name", [None, "name", True, 1]) +@pytest.mark.parametrize("name", [None, "name", True, 1, lib.no_default]) @pytest.mark.parametrize("generate_extra_index", [True, False]) @pytest.mark.parametrize("native_index", NATIVE_INDEX_TEST_DATA) def test_index_to_series(native_index, generate_extra_index, name): @@ -256,7 +250,7 @@ def test_index_to_series(native_index, generate_extra_index, name): ) -@pytest.mark.parametrize("name", [None, "name", True, 1]) +@pytest.mark.parametrize("name", [None, "name", True, 1, lib.no_default]) @pytest.mark.parametrize("generate_extra_index", [True, False]) @pytest.mark.parametrize("native_df", TEST_DFS) def test_df_index_columns_to_series(native_df, generate_extra_index, name): @@ -291,7 +285,7 @@ def test_df_index_columns_to_series(native_df, generate_extra_index, name): @sql_count_checker(query_count=1) -@pytest.mark.parametrize("name", [None, "name", True, 1]) +@pytest.mark.parametrize("name", [None, "name", True, 1, lib.no_default]) @pytest.mark.parametrize("index", [True, False]) @pytest.mark.parametrize("native_index", NATIVE_INDEX_TEST_DATA) def test_index_to_frame(native_index, name, index): @@ -305,7 +299,7 @@ def test_index_to_frame(native_index, name, index): @sql_count_checker(query_count=2) -@pytest.mark.parametrize("name", [None, "name", True, 1]) +@pytest.mark.parametrize("name", [None, "name", True, 1, lib.no_default]) @pytest.mark.parametrize("index", [True, False]) @pytest.mark.parametrize("native_df", TEST_DFS) def test_df_index_columns_to_frame(native_df, index, name): diff --git a/tests/integ/modin/index/test_value_counts.py b/tests/integ/modin/index/test_value_counts.py new file mode 100644 index 00000000000..aea87bb50d4 --- /dev/null +++ b/tests/integ/modin/index/test_value_counts.py @@ -0,0 +1,198 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# + +import modin.pandas as pd +import pandas as native_pd +import pytest + +import snowflake.snowpark.modin.plugin # noqa: F401 +from tests.integ.modin.sql_counter import sql_count_checker +from tests.integ.modin.utils import assert_series_equal, eval_snowpark_pandas_result + +NATIVE_NUMERIC_INDEX_TEST_DATA = [ + native_pd.Index([], dtype="int64"), + native_pd.Index([1], dtype="float64"), + native_pd.Index([1.1, 2.2, 1.0, 1, 1.1, 2.2, 1, 1, 1, 2, 2, 2, 2.2]), + native_pd.Index([1, 3, 1, 1, 1, 3, 1, 1, 1, 2, 2, 2, 3], name="random name"), + native_pd.Index( + [True, False, True, False, True, False, True, False, True, True], dtype=bool + ), +] + +INDEX1 = native_pd.Series( + [1, 2, 3, 2, 3, 5, 6, 7, 8, 4, 4, 5, 6, 7, 1, 2, 1, 2, 3, 4, 3, 4, 5, 6, 7] +) +INDEX2 = native_pd.Series( + [ + "a", + "b", + "c", + "b", + "c", + "e", + "f", + "g", + "h", + "d", + "d", + "e", + "f", + "g", + "a", + "b", + "a", + "b", + "c", + "d", + "c", + "d", + "e", + "f", + "g", + ] +) + + +@pytest.mark.parametrize( + "native_series, expected, normalize, sort, ascending, dropna", + [ + ( + INDEX1, + native_pd.Series( + [0.04, 0.12, 0.12, 0.12, 0.12, 0.16, 0.16, 0.16], + index=[8, 1, 5, 6, 7, 2, 3, 4], + name="proportion", + ), + True, + True, + True, + True, + ), + ( + INDEX1, + native_pd.Series( + [0.04, 0.12, 0.12, 0.12, 0.12, 0.16, 0.16, 0.16], + index=[8, 1, 5, 6, 7, 2, 3, 4], + name="proportion", + ), + True, + True, + True, + False, + ), + ( + INDEX1, + native_pd.Series( + [1, 3, 3, 3, 3, 4, 4, 4], index=[8, 1, 5, 6, 7, 2, 3, 4], name="count" + ), + False, + True, + True, + True, + ), + ( + INDEX1, + native_pd.Series( + [1, 3, 3, 3, 3, 4, 4, 4], index=[8, 1, 5, 6, 7, 2, 3, 4], name="count" + ), + False, + True, + True, + False, + ), + ( + INDEX2, + native_pd.Series( + [0.04, 0.12, 0.12, 0.12, 0.12, 0.16, 0.16, 0.16], + index=["h", "a", "e", "f", "g", "b", "c", "d"], + name="proportion", + ), + True, + True, + True, + True, + ), + ( + INDEX2, + native_pd.Series( + [0.04, 0.12, 0.12, 0.12, 0.12, 0.16, 0.16, 0.16], + index=["h", "a", "e", "f", "g", "b", "c", "d"], + name="proportion", + ), + True, + True, + True, + False, + ), + ( + INDEX2, + native_pd.Series( + [1, 3, 3, 3, 3, 4, 4, 4], + index=["h", "a", "e", "f", "g", "b", "c", "d"], + name="count", + ), + False, + True, + True, + True, + ), + ( + INDEX2, + native_pd.Series( + [1, 3, 3, 3, 3, 4, 4, 4], + index=["h", "a", "e", "f", "g", "b", "c", "d"], + name="count", + ), + False, + True, + True, + False, + ), + ], +) +@sql_count_checker(query_count=1) +def test_series_value_counts_non_deterministic_pandas_behavior( + native_series, expected, normalize, sort, ascending, dropna +): + # Native pandas produces different results locally and on Jenkins when the above data is used. + # Therefore, explicitly compare the actual and expected results. + + # If `sort=True`, sort by the frequencies. If `sort=False`, maintain the original ordering. + # `ascending` controls whether to sort the count in ascending or descending order. + + # When there is a tie between frequencies, the order is still deterministic, but + # may be different from the result from native pandas. Snowpark pandas will + # always respect the order of insertion during ties. Native pandas is not + # deterministic since the original order/order of insertion is based on the + # Python hashmap which may produce different results on different versions. + # Refer to: https://github.com/pandas-dev/pandas/issues/15833 + snow_index = pd.Index(native_series) + actual = snow_index.value_counts( + normalize=normalize, sort=sort, ascending=ascending, dropna=dropna + ) + assert_series_equal(actual, expected) + + +@pytest.mark.parametrize("native_index", NATIVE_NUMERIC_INDEX_TEST_DATA) +@pytest.mark.parametrize("normalize", [True, False]) +@pytest.mark.parametrize("sort", [True, False]) +@pytest.mark.parametrize("ascending", [True, False]) +@pytest.mark.parametrize("dropna", [True, False]) +@sql_count_checker(query_count=1) +def test_index_value_counts(native_index, normalize, sort, ascending, dropna): + snow_index = pd.Index(native_index) + eval_snowpark_pandas_result( + snow_index, + native_index, + lambda idx: idx.value_counts( + normalize=normalize, sort=sort, ascending=ascending, dropna=dropna + ), + ) + + +@sql_count_checker(query_count=0) +def test_index_value_counts_bins_negative(): + snow_index = pd.Index([1, 2, 3, 4]) + with pytest.raises(NotImplementedError, match="bins argument is not yet supported"): + snow_index.value_counts(bins=2) diff --git a/tests/integ/scala/test_dataframe_reader_suite.py b/tests/integ/scala/test_dataframe_reader_suite.py index 9007056d179..2fe0d5f2a13 100644 --- a/tests/integ/scala/test_dataframe_reader_suite.py +++ b/tests/integ/scala/test_dataframe_reader_suite.py @@ -395,6 +395,29 @@ def test_read_csv_with_infer_schema(session, mode, parse_header): Utils.check_answer(df, [Row(1, "one", 1.2), Row(2, "two", 2.2)]) +@pytest.mark.skipif( + "config.getoption('local_testing_mode', default=False)", + reason="SNOW-1435112: csv infer schema option is not supported", +) +@pytest.mark.parametrize("mode", ["select", "copy"]) +@pytest.mark.parametrize("ignore_case", [True, False]) +def test_read_csv_with_infer_schema_options(session, mode, ignore_case): + reader = get_reader(session, mode) + df = ( + reader.option("INFER_SCHEMA", True) + .option("PARSE_HEADER", True) + .option("INFER_SCHEMA_OPTIONS", {"IGNORE_CASE": ignore_case}) + .csv(f"@{tmp_stage_name1}/{test_file_csv_header}") + ) + Utils.check_answer(df, [Row(1, "one", 1.2), Row(2, "two", 2.2)]) + headers = ["id", "name", "rating"] + if ignore_case: + expected_cols = [c.upper() for c in headers] + else: + expected_cols = [f'"{c}"' for c in headers] + assert df.columns == expected_cols + + @pytest.mark.skipif( "config.getoption('local_testing_mode', default=False)", reason="SNOW-1435112: csv infer schema option is not supported",