From 3c1db0722e233d951860ce44bbb6dae18f8e9852 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Fri, 30 Aug 2024 17:15:03 -0700 Subject: [PATCH] [SNOW-1502893]: Add support for `pd.crosstab` (#1837) 1. Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR. Fixes SNOW-1502893 2. Fill out the following pre-review checklist: - [ ] I am adding a new automated test(s) to verify correctness of my new code - [ ] If this test skips Local Testing mode, I'm requesting review from @snowflakedb/local-testing - [ ] I am adding new logging messages - [ ] I am adding a new telemetry message - [ ] I am adding new credentials - [ ] I am adding a new dependency - [ ] If this is a new feature/behavior, I'm adding the Local Testing parity changes. 3. Please describe how your code solves the related issue. Add support for pd.crosstab. --- CHANGELOG.md | 1 + docs/source/modin/general_functions.rst | 1 + .../modin/supported/general_supported.rst | 5 +- .../snowpark/modin/pandas/general.py | 330 ++++++++- .../compiler/snowflake_query_compiler.py | 32 +- tests/integ/modin/crosstab/conftest.py | 91 +++ tests/integ/modin/crosstab/test_crosstab.py | 639 ++++++++++++++++++ tests/unit/modin/test_unsupported.py | 1 - 8 files changed, 1071 insertions(+), 29 deletions(-) create mode 100644 tests/integ/modin/crosstab/conftest.py create mode 100644 tests/integ/modin/crosstab/test_crosstab.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 473ce424248..005aaa3a8dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,6 +80,7 @@ - Added support for `DataFrameGroupBy.value_counts` and `SeriesGroupBy.value_counts`. - Added support for `Series.is_monotonic_increasing` and `Series.is_monotonic_decreasing`. - Added support for `Index.is_monotonic_increasing` and `Index.is_monotonic_decreasing`. +- Added support for `pd.crosstab`. #### Improvements diff --git a/docs/source/modin/general_functions.rst b/docs/source/modin/general_functions.rst index 803a901ac15..858bc54003e 100644 --- a/docs/source/modin/general_functions.rst +++ b/docs/source/modin/general_functions.rst @@ -11,6 +11,7 @@ General functions :toctree: pandas_api/ melt + crosstab pivot pivot_table cut diff --git a/docs/source/modin/supported/general_supported.rst b/docs/source/modin/supported/general_supported.rst index b055ed9dc6d..a12951d00f6 100644 --- a/docs/source/modin/supported/general_supported.rst +++ b/docs/source/modin/supported/general_supported.rst @@ -18,7 +18,10 @@ Data manipulations | ``concat`` | P | ``levels`` is not supported, | | | | | ``copy`` is ignored | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ -| ``crosstab`` | N | | | +| ``crosstab`` | P | | ``N`` if ``aggfunc`` is not one of | +| | | | "count", "mean", "min", "max", or "sum", or | +| | | | margins is True, normalize is "all" or True, | +| | | | and values is passed. | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``cut`` | P | ``retbins``, ``labels`` | ``N`` if ``retbins=True``or ``labels!=False`` | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ diff --git a/src/snowflake/snowpark/modin/pandas/general.py b/src/snowflake/snowpark/modin/pandas/general.py index 07f0617d612..df19e9eac91 100644 --- a/src/snowflake/snowpark/modin/pandas/general.py +++ b/src/snowflake/snowpark/modin/pandas/general.py @@ -22,7 +22,7 @@ """Implement pandas general API.""" from __future__ import annotations -from collections.abc import Hashable, Iterable, Mapping, Sequence +from collections.abc import Callable, Hashable, Iterable, Mapping, Sequence from datetime import date, datetime, timedelta, tzinfo from logging import getLogger from typing import TYPE_CHECKING, Any, Literal, Union @@ -49,7 +49,7 @@ _infer_tz_from_endpoints, _maybe_normalize_endpoints, ) -from pandas.core.dtypes.common import is_list_like +from pandas.core.dtypes.common import is_list_like, is_nested_list_like from pandas.core.dtypes.inference import is_array_like from pandas.core.tools.datetimes import ( ArrayConvertible, @@ -1982,8 +1982,6 @@ def melt( @snowpark_pandas_telemetry_standalone_function_decorator -@pandas_module_level_function_not_implemented() -@_inherit_docstrings(pandas.crosstab, apilink="pandas.crosstab") def crosstab( index, columns, @@ -1998,21 +1996,319 @@ def crosstab( ) -> DataFrame: # noqa: PR01, RT01, D200 """ Compute a simple cross tabulation of two (or more) factors. + + By default, computes a frequency table of the factors unless an array + of values and an aggregation function are passed. + + Parameters + ---------- + index : array-like, Series, or list of arrays/Series + Values to group by in the rows. + columns : array-like, Series, or list of arrays/Series + Values to group by in the columns. + values : array-like, optional + Array of values to aggregate according to the factors. + Requires aggfunc be specified. + rownames : sequence, default None + If passed, must match number of row arrays passed. + colnames : sequence, default None + If passed, must match number of column arrays passed. + aggfunc : function, optional + If specified, requires values be specified as well. + margins : bool, default False + Add row/column margins (subtotals). + margins_name : str, default 'All' + Name of the row/column that will contain the totals when margins is True. + dropna : bool, default True + Do not include columns whose entries are all NaN. + + normalize : bool, {'all', 'index', 'columns'}, or {0,1}, default False + Normalize by dividing all values by the sum of values. + + * If passed 'all' or True, will normalize over all values. + * If passed 'index' will normalize over each row. + * If passed 'columns' will normalize over each column. + * If margins is True, will also normalize margin values. + + Returns + ------- + Snowpark pandas :class:`~snowflake.snowpark.modin.pandas.DataFrame` + Cross tabulation of the data. + + Notes + ----- + + Raises NotImplementedError if aggfunc is not one of "count", "mean", "min", "max", or "sum", or + margins is True, normalize is True or all, and values is passed. + + Examples + -------- + >>> a = np.array(["foo", "foo", "foo", "foo", "bar", "bar", + ... "bar", "bar", "foo", "foo", "foo"], dtype=object) + >>> b = np.array(["one", "one", "one", "two", "one", "one", + ... "one", "two", "two", "two", "one"], dtype=object) + >>> c = np.array(["dull", "dull", "shiny", "dull", "dull", "shiny", + ... "shiny", "dull", "shiny", "shiny", "shiny"], + ... dtype=object) + >>> pd.crosstab(a, [b, c], rownames=['a'], colnames=['b', 'c']) # doctest: +NORMALIZE_WHITESPACE + b one two + c dull shiny dull shiny + a + bar 1 2 1 0 + foo 2 2 1 2 """ - # TODO: SNOW-1063345: Modin upgrade - modin.pandas functions in general.py - pandas_crosstab = pandas.crosstab( - index, - columns, - values, - rownames, - colnames, - aggfunc, - margins, - margins_name, - dropna, - normalize, + if values is None and aggfunc is not None: + raise ValueError("aggfunc cannot be used without values.") + + if values is not None and aggfunc is None: + raise ValueError("values cannot be used without an aggfunc.") + + if not is_nested_list_like(index): + index = [index] + if not is_nested_list_like(columns): + columns = [columns] + + if ( + values is not None + and margins is True + and (normalize is True or normalize == "all") + ): + raise NotImplementedError( + 'Snowpark pandas does not yet support passing in margins=True, normalize="all", and values.' + ) + + user_passed_rownames = rownames is not None + user_passed_colnames = colnames is not None + + from pandas.core.reshape.pivot import _build_names_mapper, _get_names + + def _get_names_wrapper(list_of_objs, names, prefix): + """ + Helper method to expand DataFrame objects containing + multiple columns into Series, since `_get_names` expects + one column per entry. + """ + expanded_list_of_objs = [] + for obj in list_of_objs: + if isinstance(obj, DataFrame): + for col in obj.columns: + expanded_list_of_objs.append(obj[col]) + else: + expanded_list_of_objs.append(obj) + return _get_names(expanded_list_of_objs, names, prefix) + + rownames = _get_names_wrapper(index, rownames, prefix="row") + colnames = _get_names_wrapper(columns, colnames, prefix="col") + + ( + rownames_mapper, + unique_rownames, + colnames_mapper, + unique_colnames, + ) = _build_names_mapper(rownames, colnames) + + pass_objs = [x for x in index + columns if isinstance(x, (Series, DataFrame))] + row_idx_names = None + col_idx_names = None + if pass_objs: + # If we have any Snowpark pandas objects in the index or columns, then we + # need to find the intersection of their indices, and only pick rows from + # the objects that have indices in the intersection of their indices. + # After we do that, we then need to append the non Snowpark pandas objects + # using the intersection of indices as the final index for the DataFrame object. + # First, we separate the objects into Snowpark pandas objects, and non-Snowpark + # pandas objects (while renaming them so that they have unique names). + rownames_idx = 0 + row_idx_names = [] + dfs = [] + arrays = [] + array_lengths = [] + for obj in index: + if isinstance(obj, Series): + row_idx_names.append(obj.name) + df = pd.DataFrame(obj) + df.columns = [unique_rownames[rownames_idx]] + rownames_idx += 1 + dfs.append(df) + elif isinstance(obj, DataFrame): + row_idx_names.extend(obj.columns) + obj.columns = unique_rownames[ + rownames_idx : rownames_idx + len(obj.columns) + ] + rownames_idx += len(obj.columns) + dfs.append(obj) + else: + row_idx_names.append(None) + array_lengths.append(len(obj)) + df = pd.DataFrame(obj) + df.columns = unique_rownames[ + rownames_idx : rownames_idx + len(df.columns) + ] + rownames_idx += len(df.columns) + arrays.append(df) + + colnames_idx = 0 + col_idx_names = [] + for obj in columns: + if isinstance(obj, Series): + col_idx_names.append(obj.name) + df = pd.DataFrame(obj) + df.columns = [unique_colnames[colnames_idx]] + colnames_idx += 1 + dfs.append(df) + elif isinstance(obj, DataFrame): + col_idx_names.extend(obj.columns) + obj.columns = unique_colnames[ + colnames_idx : colnames_idx + len(obj.columns) + ] + colnames_idx += len(obj.columns) + dfs.append(obj) + else: + col_idx_names.append(None) + array_lengths.append(len(obj)) + df = pd.DataFrame(obj) + df.columns = unique_colnames[ + colnames_idx : colnames_idx + len(df.columns) + ] + colnames_idx += len(df.columns) + arrays.append(df) + + if len(set(array_lengths)) > 1: + raise ValueError("All arrays must be of the same length") + + # Now, we have two lists - a list of Snowpark pandas objects, and a list of objects + # that were not passed in as Snowpark pandas objects, but that we have converted + # to Snowpark pandas objects to give them column names. We can perform inner joins + # on the dfs list to get a DataFrame with the final index (that is only an intersection + # of indices.) + df = dfs[0] + for right in dfs[1:]: + df = df.merge(right, left_index=True, right_index=True) + if len(arrays) > 0: + index = df.index + right_df = pd.concat(arrays, axis=1) + # Increases query count by 1, but necessary for error checking. + index_length = len(df) + if index_length != array_lengths[0]: + raise ValueError( + f"Length mismatch: Expected {array_lengths[0]} rows, received array of length {index_length}" + ) + right_df.index = index + df = df.merge(right_df, left_index=True, right_index=True) + else: + data = { + **dict(zip(unique_rownames, index)), + **dict(zip(unique_colnames, columns)), + } + df = DataFrame(data) + + if values is None: + df["__dummy__"] = 0 + kwargs = {"aggfunc": "count"} + else: + df["__dummy__"] = values + kwargs = {"aggfunc": aggfunc} + + table = df.pivot_table( + "__dummy__", + index=unique_rownames, + columns=unique_colnames, + margins=margins, + margins_name=margins_name, + dropna=dropna, + **kwargs, # type: ignore[arg-type] ) - return DataFrame(pandas_crosstab) + + if row_idx_names is not None and not user_passed_rownames: + table.index = table.index.set_names(row_idx_names) + + if col_idx_names is not None and not user_passed_colnames: + table.columns = table.columns.set_names(col_idx_names) + + if aggfunc is None: + # If no aggfunc is provided, we are computing frequencies. Since we use + # pivot_table above, pairs that are not observed will get a NaN value, + # so we need to fill all NaN values with 0. + table = table.fillna(0) + + # We must explicitly check that the value of normalize is not False here, + # as a valid value of normalize is `0` (for normalizing index). + if normalize is not False: + if normalize not in [0, 1, "index", "columns", "all", True]: + raise ValueError("Not a valid normalize argument") + if normalize is True: + normalize = "all" + normalize = {0: "index", 1: "columns"}.get(normalize, normalize) + + # Actual Normalizations + normalizers: dict[bool | str, Callable] = { + "all": lambda x: x / x.sum(axis=0).sum(), + "columns": lambda x: x / x.sum(), + "index": lambda x: x.div(x.sum(axis=1), axis="index"), + } + + if margins is False: + + f = normalizers[normalize] + names = table.columns.names + table = f(table) + table.columns.names = names + table = table.fillna(0) + else: + # keep index and column of pivoted table + table_index = table.index + table_columns = table.columns + + column_margin = table.iloc[:-1, -1] + + if normalize == "columns": + # keep the core table + table = table.iloc[:-1, :-1] + + # Normalize core + f = normalizers[normalize] + table = f(table) + table = table.fillna(0) + # Fix Margins + column_margin = column_margin / column_margin.sum() + table = pd.concat([table, column_margin], axis=1) + table = table.fillna(0) + table.columns = table_columns + + elif normalize == "index": + table = table.iloc[:, :-1] + + # Normalize core + f = normalizers[normalize] + table = f(table) + table = table.fillna(0).reindex(index=table_index) + + elif normalize == "all": + # Normalize core + f = normalizers[normalize] + + # When we perform the normalization function, we take the sum over + # the rows, and divide every value by the sum. Since margins is included + # though, the result of the sum is actually 2 * the sum of the original + # values (since the margin itself is the sum of the original values), + # so we need to multiply by 2 here to account for that. + # The alternative would be to apply normalization to the main table + # and the index margins separately, but that would require additional joins + # to get the final table, which we want to avoid. + table = f(table.iloc[:, :-1]) * 2.0 + + column_margin = column_margin / column_margin.sum() + table = pd.concat([table, column_margin], axis=1) + table.iloc[-1, -1] = 1 + + table = table.fillna(0) + table.index = table_index + table.columns = table_columns + + table = table.rename_axis(index=rownames_mapper, axis=0) + table = table.rename_axis(columns=colnames_mapper, axis=1) + + return table # Adding docstring since pandas docs don't have web section for this function. diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index be994015eac..848c5e438b3 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -197,7 +197,10 @@ compute_bin_indices, preprocess_bins_for_cut, ) -from snowflake.snowpark.modin.plugin._internal.frame import InternalFrame +from snowflake.snowpark.modin.plugin._internal.frame import ( + InternalFrame, + LabelIdentifierPair, +) from snowflake.snowpark.modin.plugin._internal.groupby_utils import ( GROUPBY_AGG_PRESERVES_SNOWPARK_PANDAS_TYPE, GROUPBY_AGG_WITH_NONE_SNOWPARK_PANDAS_TYPES, @@ -5698,11 +5701,14 @@ def agg( ) for agg_arg in agg_args } + pandas_labels = list(agg_col_map.keys()) + if self.is_multiindex(axis=1): + pandas_labels = [ + (label,) * len(self.columns.names) for label in pandas_labels + ] single_agg_func_query_compilers.append( SnowflakeQueryCompiler( - frame.project_columns( - list(agg_col_map.keys()), list(agg_col_map.values()) - ) + frame.project_columns(pandas_labels, list(agg_col_map.values())) ) ) else: # axis == 0 @@ -14138,7 +14144,6 @@ def create_lazy_type_functions( assert len(right_result_data_identifiers) == 1, "other must be a Series" right = right_result_data_identifiers[0] right_datatype = right_datatypes[0] - # now replace in result frame identifiers with binary op result replace_mapping = {} snowpark_pandas_types = [] @@ -14160,10 +14165,19 @@ def create_lazy_type_functions( identifiers_to_keep = set( new_frame.index_column_snowflake_quoted_identifiers ) | set(update_result.old_id_to_new_id_mappings.values()) + self_is_column_mi = len(self._modin_frame.data_column_pandas_index_names) label_to_snowflake_quoted_identifier = [] snowflake_quoted_identifier_to_snowpark_pandas_type = {} for pair in new_frame.label_to_snowflake_quoted_identifier: if pair.snowflake_quoted_identifier in identifiers_to_keep: + if ( + self_is_column_mi + and isinstance(pair.label, tuple) + and isinstance(pair.label[0], tuple) + ): + pair = LabelIdentifierPair( + pair.label[0], pair.snowflake_quoted_identifier + ) label_to_snowflake_quoted_identifier.append(pair) snowflake_quoted_identifier_to_snowpark_pandas_type[ pair.snowflake_quoted_identifier @@ -14177,7 +14191,7 @@ def create_lazy_type_functions( label_to_snowflake_quoted_identifier ), num_index_columns=new_frame.num_index_columns, - data_column_index_names=new_frame.data_column_index_names, + data_column_index_names=self._modin_frame.data_column_index_names, snowflake_quoted_identifier_to_snowpark_pandas_type=snowflake_quoted_identifier_to_snowpark_pandas_type, ) @@ -14588,9 +14602,7 @@ def infer_sorted_column_labels( new_frame = InternalFrame.create( ordered_dataframe=expanded_ordered_frame, data_column_pandas_labels=sorted_column_labels, - data_column_pandas_index_names=[ - None - ], # operation removes column index name always. + data_column_pandas_index_names=self._modin_frame.data_column_pandas_index_names, data_column_snowflake_quoted_identifiers=frame.data_column_snowflake_quoted_identifiers + new_identifiers, index_column_pandas_labels=index_column_pandas_labels, @@ -14637,7 +14649,7 @@ def infer_sorted_column_labels( new_frame = InternalFrame.create( ordered_dataframe=expanded_ordered_frame, data_column_pandas_labels=expanded_data_column_pandas_labels, - data_column_pandas_index_names=[None], # operation removes names + data_column_pandas_index_names=self._modin_frame.data_column_pandas_index_names, data_column_snowflake_quoted_identifiers=expanded_data_column_snowflake_quoted_identifiers, index_column_pandas_labels=index_column_pandas_labels, index_column_snowflake_quoted_identifiers=frame.index_column_snowflake_quoted_identifiers, diff --git a/tests/integ/modin/crosstab/conftest.py b/tests/integ/modin/crosstab/conftest.py new file mode 100644 index 00000000000..6203419321d --- /dev/null +++ b/tests/integ/modin/crosstab/conftest.py @@ -0,0 +1,91 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# + +import modin.pandas as pd +import numpy as np +import pandas as native_pd +import pytest + +import snowflake.snowpark.modin.plugin # noqa: F401 + + +@pytest.fixture(scope="function") +def a(): + return np.array( + [ + "foo", + "foo", + "foo", + "foo", + "bar", + "bar", + "bar", + "bar", + "foo", + "foo", + "foo", + ], + dtype=object, + ) + + +@pytest.fixture(scope="function") +def b(): + return np.array( + [ + "one", + "one", + "one", + "two", + "one", + "one", + "one", + "two", + "two", + "two", + "one", + ], + dtype=object, + ) + + +@pytest.fixture(scope="function") +def c(): + return np.array( + [ + "dull", + "dull", + "shiny", + "dull", + "dull", + "shiny", + "shiny", + "dull", + "shiny", + "shiny", + "shiny", + ], + dtype=object, + ) + + +@pytest.fixture(scope="function") +def basic_crosstab_dfs(): + df = native_pd.DataFrame( + { + "species": ["dog", "cat", "dog", "dog", "cat", "cat", "dog", "cat"], + "favorite_food": [ + "chicken", + "fish", + "fish", + "beef", + "chicken", + "beef", + "fish", + "beef", + ], + "age": [7, 2, 8, 5, 9, 3, 6, 1], + } + ) + return df, pd.DataFrame(df) diff --git a/tests/integ/modin/crosstab/test_crosstab.py b/tests/integ/modin/crosstab/test_crosstab.py new file mode 100644 index 00000000000..276650519d9 --- /dev/null +++ b/tests/integ/modin/crosstab/test_crosstab.py @@ -0,0 +1,639 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# + +import re + +import modin.pandas as pd +import numpy as np +import pandas as native_pd +import pytest + +import snowflake.snowpark.modin.plugin # noqa: F401 +from tests.integ.modin.sql_counter import SqlCounter, sql_count_checker +from tests.integ.modin.utils import eval_snowpark_pandas_result + + +@pytest.mark.parametrize("dropna", [True, False]) +class TestCrosstab: + def test_basic_crosstab_with_numpy_arrays(self, dropna, a, b, c): + query_count = 1 + join_count = 0 if dropna else 1 + with SqlCounter(query_count=query_count, join_count=join_count): + eval_snowpark_pandas_result( + pd, + native_pd, + lambda lib: lib.crosstab( + a, [b, c], rownames=["a"], colnames=["b", "c"], dropna=dropna + ), + ) + + def test_basic_crosstab_with_numpy_arrays_different_lengths(self, dropna, a, b, c): + a = a[:-1] + b = b[:-2] + c = c[:-3] + with SqlCounter(query_count=0): + eval_snowpark_pandas_result( + pd, + native_pd, + lambda lib: lib.crosstab( + a, [b, c], rownames=["a"], colnames=["b", "c"], dropna=dropna + ), + assert_exception_equal=True, + expect_exception=True, + expect_exception_match="All arrays must be of the same length", + expect_exception_type=ValueError, + ) + + # In these tests, `overlap` refers to the intersection of the indices + # of the Series objects being passed in to crosstab. crosstab takes + # only the intersection of the index objects of all Series when determining + # the final DataFrame to pass into pivot_table, so here, we are testing + # that we follow that behavior. + def test_basic_crosstab_with_series_objs_full_overlap(self, dropna, a, b, c): + # In this case, all indexes are identical - hence "full" overlap. + query_count = 2 + join_count = 5 if dropna else 10 + + def eval_func(lib): + if lib is pd: + return lib.crosstab( + a, + [lib.Series(b), lib.Series(c)], + rownames=["a"], + colnames=["b", "c"], + dropna=dropna, + ) + else: + return lib.crosstab( + a, [b, c], rownames=["a"], colnames=["b", "c"], dropna=dropna + ) + + with SqlCounter(query_count=query_count, join_count=join_count): + eval_snowpark_pandas_result(pd, native_pd, eval_func) + + def test_basic_crosstab_with_series_objs_some_overlap(self, dropna, a, b, c): + # In this case, some values are shared across indexes (non-zero intersection), + # hence "some" overlap. + # When a mix of Series and non-Series objects are passed in, the non-Series + # objects are expected to have the same length as the intersection of the indexes + # of the Series objects. This test case passes because we pass in arrays that + # are the length of the intersection rather than the length of each of the Series. + query_count = 2 + join_count = 5 if dropna else 10 + b = native_pd.Series( + b, + index=list(range(len(a))), + ) + c = native_pd.Series( + c, + index=-1 * np.array(list(range(len(a)))), + ) + + # All columns have to be the same length (if NumPy arrays are present, then + # pandas errors if they do not match the length of the other Series after + # they are joined (i.e. filtered so that their indices are the same)). In + # this test, we truncate the numpy column so that the lengths are correct. + def eval_func(args_list): + a, b, c = args_list + if isinstance(b, native_pd.Series): + return native_pd.crosstab( + a[:1], [b, c], rownames=["a"], colnames=["b", "c"], dropna=dropna + ) + else: + return pd.crosstab( + a[:1], [b, c], rownames=["a"], colnames=["b", "c"], dropna=dropna + ) + + with SqlCounter(query_count=query_count, join_count=join_count): + native_args = [a, b, c] + snow_args = [a, pd.Series(b), pd.Series(c)] + eval_snowpark_pandas_result( + snow_args, + native_args, + eval_func, + ) + + @sql_count_checker(query_count=1, join_count=1) + def test_basic_crosstab_with_series_objs_some_overlap_error(self, dropna, a, b, c): + # Same as above - the intersection of the indexes of the Series objects + # is non-zero, but the indexes are not identical - hence "some" overlap. + # When a mix of Series and non-Series objects are passed in, the non-Series + # objects are expected to have the same length as the intersection of the indexes + # of the Series objects. This test case errors because we pass in arrays that + # are the length of the Series, rather than the length of the intersection of + # the indexes of the Series. + b = native_pd.Series( + b, + index=list(range(len(a))), + ) + c = native_pd.Series( + c, + index=-1 * np.array(list(range(len(a)))), + ) + + # All columns have to be the same length (if NumPy arrays are present, then + # pandas errors if they do not match the length of the other Series after + # they are joined (i.e. filtered so that their indices are the same)) + def eval_func(args_list): + a, b, c = args_list + if isinstance(b, native_pd.Series): + return native_pd.crosstab( + a, [b, c], rownames=["a"], colnames=["b", "c"], dropna=dropna + ) + else: + return pd.crosstab( + a, [b, c], rownames=["a"], colnames=["b", "c"], dropna=dropna + ) + + native_args = [a, b, c] + snow_args = [a, pd.Series(b), pd.Series(c)] + eval_snowpark_pandas_result( + snow_args, + native_args, + eval_func, + expect_exception=True, + expect_exception_match=re.escape( + "Length mismatch: Expected 11 rows, received array of length 1" + ), + expect_exception_type=ValueError, + assert_exception_equal=False, # Our error message is a little different. + ) + + @sql_count_checker(query_count=1, join_count=1) + def test_basic_crosstab_with_series_objs_no_overlap_error(self, dropna, a, b, c): + # In this case, no values are shared across the indexes - the intersection is an + # empty set - hence "no" overlap. We error here for the same reason as above - the + # arrays passed in should also be empty, but are non-empty. + b = native_pd.Series( + b, + index=list(range(len(a))), + ) + c = native_pd.Series( + c, + index=-1 - np.array(list(range(len(a)))), + ) + + # All columns have to be the same length (if NumPy arrays are present, then + # pandas errors if they do not match the length of the other Series after + # they are joined (i.e. filtered so that their indices are the same)) + def eval_func(args_list): + a, b, c = args_list + if isinstance(b, native_pd.Series): + return native_pd.crosstab( + a, [b, c], rownames=["a"], colnames=["b", "c"], dropna=dropna + ) + else: + return pd.crosstab( + a, [b, c], rownames=["a"], colnames=["b", "c"], dropna=dropna + ) + + native_args = [a, b, c] + snow_args = [a, pd.Series(b), pd.Series(c)] + eval_snowpark_pandas_result( + snow_args, + native_args, + eval_func, + expect_exception=True, + expect_exception_match=re.escape( + "Length mismatch: Expected 11 rows, received array of length 0" + ), + expect_exception_type=ValueError, + assert_exception_equal=False, # Our error message is a little different. + ) + + def test_basic_crosstab_with_df_and_series_objs_pandas_errors_columns( + self, dropna, a, b, c + ): + query_count = 4 + join_count = 1 if dropna else 3 + a = native_pd.Series( + a, + dtype=object, + ) + b = native_pd.DataFrame( + { + "0": b, + "1": c, + } + ) + # pandas expects only Series objects, or DataFrames that have only a single column, while + # we support accepting DataFrames with multiple columns. + with pytest.raises( + AssertionError, match="arrays and names must have the same length" + ): + native_pd.crosstab(a, b, rownames=["a"], colnames=["b", "c"], dropna=dropna) + + def eval_func(args_list): + a, b = args_list + if isinstance(a, native_pd.Series): + return native_pd.crosstab( + a, + [b[c] for c in b.columns], + rownames=["a"], + colnames=["b", "c"], + dropna=dropna, + ) + else: + return pd.crosstab( + a, b, rownames=["a"], colnames=["b", "c"], dropna=dropna + ) + + with SqlCounter(query_count=query_count, join_count=join_count): + native_args = [a, b] + snow_args = [pd.Series(a), pd.DataFrame(b)] + eval_snowpark_pandas_result( + snow_args, + native_args, + eval_func, + ) + + def test_basic_crosstab_with_df_and_series_objs_pandas_errors_index( + self, dropna, a, b, c + ): + query_count = 6 + join_count = 5 if dropna else 17 + a = native_pd.Series( + a, + dtype=object, + ) + b = native_pd.DataFrame( + { + "0": b, + "1": c, + } + ) + # pandas expects only Series objects, or DataFrames that have only a single column, while + # we support accepting DataFrames with multiple columns. + with pytest.raises( + AssertionError, match="arrays and names must have the same length" + ): + native_pd.crosstab(b, a, rownames=["a", "b"], colnames=["c"], dropna=dropna) + + def eval_func(args_list): + a, b = args_list + if isinstance(a, native_pd.Series): + return native_pd.crosstab( + [b[c] for c in b.columns], + a, + rownames=["a", "b"], + colnames=["c"], + dropna=dropna, + ) + else: + return pd.crosstab( + b, a, rownames=["a", "b"], colnames=["c"], dropna=dropna + ) + + with SqlCounter(query_count=query_count, join_count=join_count): + native_args = [a, b] + snow_args = [pd.Series(a), pd.DataFrame(b)] + eval_snowpark_pandas_result( + snow_args, + native_args, + eval_func, + ) + + def test_margins(self, dropna, a, b, c): + query_count = 1 + join_count = 1 if dropna else 2 + union_count = 1 + + with SqlCounter( + query_count=query_count, join_count=join_count, union_count=union_count + ): + eval_snowpark_pandas_result( + pd, + native_pd, + lambda lib: lib.crosstab( + a, + [b, c], + rownames=["a"], + colnames=["b", "c"], + margins=True, + margins_name="MARGINS_NAME", + dropna=dropna, + ), + ) + + @pytest.mark.parametrize("normalize", [0, 1, True, "all", "index", "columns"]) + def test_normalize(self, dropna, normalize, a, b, c): + query_count = 1 if normalize in (0, "index") else 2 + join_count = 3 if normalize in (0, "index") else 2 + if dropna: + join_count -= 2 + + with SqlCounter(query_count=query_count, join_count=join_count): + eval_snowpark_pandas_result( + pd, + native_pd, + lambda lib: lib.crosstab( + a, + [b, c], + rownames=["a"], + colnames=["b", "c"], + normalize=normalize, + dropna=dropna, + ), + ) + + @pytest.mark.parametrize("normalize", [0, 1, True, "all", "index", "columns"]) + def test_normalize_and_margins(self, dropna, normalize, a, b, c): + counts = { + "columns": [3, 5 if dropna else 9, 4], + "index": [1, 5 if dropna else 8, 3], + "all": [3, 12 if dropna else 19, 7], + } + counts[0] = counts["index"] + counts[1] = counts["columns"] + + if normalize is True: + sql_counts = counts["all"] + else: + sql_counts = counts[normalize] + with SqlCounter( + query_count=sql_counts[0], + join_count=sql_counts[1], + union_count=sql_counts[2], + ): + eval_snowpark_pandas_result( + pd, + native_pd, + lambda lib: lib.crosstab( + a, + [b, c], + rownames=["a"], + colnames=["b", "c"], + normalize=normalize, + margins=True, + dropna=dropna, + ), + ) + + @pytest.mark.parametrize("normalize", [0, 1, "index", "columns"]) + @pytest.mark.parametrize("aggfunc", ["count", "mean", "min", "max", "sum"]) + def test_normalize_margins_and_values(self, dropna, normalize, aggfunc, a, b, c): + counts = { + "columns": [3, 29 if dropna else 41, 4], + "index": [1, 23 if dropna else 32, 3], + "all": [3, 54 if dropna else 75, 7], + } + counts[0] = counts["index"] + counts[1] = counts["columns"] + vals = np.array([12, 10, 9, 4, 3, 49, 19, 20, 21, 34, 0]) + if normalize is True: + sql_counts = counts["all"] + else: + sql_counts = counts[normalize] + + def eval_func(lib): + df = lib.crosstab( + a, + [b, c], + rownames=["a"], + colnames=["b", "c"], + values=vals, + normalize=normalize, + margins=True, + dropna=dropna, + aggfunc=aggfunc, + ) + if aggfunc == "sum": + # When normalizing the data, we apply the normalization function to the + # entire table (including margins), which requires us to multiply by 2 + # (since the function takes the sum over the rows, and the margins row is + # itself the sum over the rows, causing the sum over all rows to be equal + # to 2 * the sum over the input rows). This hack allows us to save on joins + # but results in slight precision issues. + df = df.round(decimals=6) + return df + + with SqlCounter( + query_count=sql_counts[0], + join_count=sql_counts[1], + union_count=sql_counts[2], + ): + eval_snowpark_pandas_result( + pd, + native_pd, + eval_func, + ) + + @pytest.mark.parametrize("aggfunc", ["count", "mean", "min", "max", "sum"]) + def test_margins_and_values(self, dropna, aggfunc, a, b, c): + vals = np.array([12, 10, 9, 4, 3, 49, 19, 20, 21, 34, 0]) + + def eval_func(lib): + df = lib.crosstab( + a, + [b, c], + rownames=["a"], + colnames=["b", "c"], + values=vals, + margins=True, + dropna=dropna, + aggfunc=aggfunc, + ) + return df + + with SqlCounter( + query_count=1, + join_count=7 if dropna else 10, + union_count=1, + ): + eval_snowpark_pandas_result( + pd, + native_pd, + eval_func, + ) + + @pytest.mark.parametrize("normalize", [0, 1, True, "all", "index", "columns"]) + @pytest.mark.parametrize("aggfunc", ["count", "mean", "min", "max", "sum"]) + def test_normalize_and_values(self, dropna, normalize, aggfunc, a, b, c): + counts = { + "columns": [2, 4 if dropna else 10], + "index": [1, 5 if dropna else 11], + "all": [2, 4 if dropna else 10], + } + counts[0] = counts["index"] + counts[1] = counts["columns"] + vals = np.array([12, 10, 9, 4, 3, 49, 19, 20, 21, 34, 0]) + if normalize is True: + sql_counts = counts["all"] + else: + sql_counts = counts[normalize] + + def eval_func(lib): + df = lib.crosstab( + a, + [b, c], + rownames=["a"], + colnames=["b", "c"], + values=vals, + normalize=normalize, + dropna=dropna, + aggfunc=aggfunc, + ) + if aggfunc in ["sum", "max"]: + # When normalizing the data, we apply the normalization function to the + # entire table (including margins), which requires us to multiply by 2 + # (since the function takes the sum over the rows, and the margins row is + # itself the sum over the rows, causing the sum over all rows to be equal + # to 2 * the sum over the input rows). This hack allows us to save on joins + # but results in slight precision issues. + df = df.round(decimals=6) + return df + + with SqlCounter( + query_count=sql_counts[0], + join_count=sql_counts[1], + ): + eval_snowpark_pandas_result( + pd, + native_pd, + eval_func, + ) + + @pytest.mark.parametrize("normalize", ["all", True]) + @pytest.mark.parametrize("aggfunc", ["count", "mean", "min", "max", "sum"]) + @sql_count_checker(query_count=0) + def test_normalize_margins_and_values_not_supported( + self, dropna, normalize, aggfunc, a, b, c + ): + vals = np.array([12, 10, 9, 4, 3, 49, 19, 20, 21, 34, 0]) + with pytest.raises( + NotImplementedError, + match='Snowpark pandas does not yet support passing in margins=True, normalize="all", and values.', + ): + pd.crosstab( + a, + [b, c], + rownames=["a"], + colnames=["b", "c"], + values=vals, + normalize=normalize, + margins=True, + dropna=dropna, + aggfunc=aggfunc, + ) + + @pytest.mark.parametrize("aggfunc", ["count", "mean", "min", "max", "sum"]) + def test_values(self, dropna, aggfunc, basic_crosstab_dfs): + query_count = 1 + join_count = 2 if dropna else 5 + native_df = basic_crosstab_dfs[0] + + with SqlCounter(query_count=query_count, join_count=join_count): + eval_snowpark_pandas_result( + pd, + native_pd, + lambda lib: lib.crosstab( + native_df["species"].values, + native_df["favorite_food"].values, + values=native_df["age"].values, + aggfunc=aggfunc, + dropna=dropna, + ), + ) + + @pytest.mark.parametrize("aggfunc", ["count", "mean", "min", "max", "sum"]) + def test_values_series_like(self, dropna, aggfunc, basic_crosstab_dfs): + query_count = 5 + join_count = 2 if dropna else 5 + native_df, snow_df = basic_crosstab_dfs + + def eval_func(df): + if isinstance(df, pd.DataFrame): + return pd.crosstab( + df["species"], + df["favorite_food"], + values=df["age"], + aggfunc=aggfunc, + dropna=dropna, + ) + else: + return native_pd.crosstab( + df["species"], + df["favorite_food"], + values=df["age"], + aggfunc=aggfunc, + dropna=dropna, + ) + + with SqlCounter(query_count=query_count, join_count=join_count): + eval_snowpark_pandas_result( + snow_df, + native_df, + eval_func, + ) + + +@sql_count_checker(query_count=0) +def test_values_unsupported_aggfunc(basic_crosstab_dfs): + native_df = basic_crosstab_dfs[0] + + with pytest.raises( + NotImplementedError, + match="Snowpark pandas DataFrame.pivot_table does not yet support the aggregation 'median' with the given arguments.", + ): + pd.crosstab( + native_df["species"].values, + native_df["favorite_food"].values, + values=native_df["age"].values, + aggfunc="median", + dropna=False, + ) + + +@sql_count_checker(query_count=4) +def test_values_series_like_unsupported_aggfunc(basic_crosstab_dfs): + # The query count above comes from building the DataFrame + # that we pass in to pivot table. + _, snow_df = basic_crosstab_dfs + + with pytest.raises( + NotImplementedError, + match="Snowpark pandas DataFrame.pivot_table does not yet support the aggregation 'median' with the given arguments.", + ): + snow_df = pd.crosstab( + snow_df["species"], + snow_df["favorite_food"], + values=snow_df["age"], + aggfunc="median", + dropna=False, + ) + + +@sql_count_checker(query_count=0) +def test_values_aggfunc_one_supplied_should_error(a, b, c): + eval_snowpark_pandas_result( + pd, + native_pd, + lambda lib: lib.crosstab(index=a, columns=b, aggfunc="sum"), + expect_exception=True, + expect_exception_match="aggfunc cannot be used without values.", + expect_exception_type=ValueError, + assert_exception_equal=True, + ) + eval_snowpark_pandas_result( + pd, + native_pd, + lambda lib: lib.crosstab(index=a, columns=b, values=c), + expect_exception=True, + expect_exception_match="values cannot be used without an aggfunc.", + expect_exception_type=ValueError, + assert_exception_equal=True, + ) + + +@sql_count_checker(query_count=0) +def test_invalid_normalize(a, b): + eval_snowpark_pandas_result( + pd, + native_pd, + lambda lib: lib.crosstab(index=a, columns=b, normalize="invalid_value"), + expect_exception=True, + expect_exception_match="Not a valid normalize argument", + expect_exception_type=ValueError, + assert_exception_equal=True, + ) diff --git a/tests/unit/modin/test_unsupported.py b/tests/unit/modin/test_unsupported.py index 1e72dbd43ca..63a1cbc3bd3 100644 --- a/tests/unit/modin/test_unsupported.py +++ b/tests/unit/modin/test_unsupported.py @@ -45,7 +45,6 @@ def test_unsupported_io(io_method, kwargs): [ ["merge_ordered", {"left": "", "right": ""}], ["value_counts", {"values": ""}], - ["crosstab", {"index": "", "columns": ""}], ["lreshape", {"data": "", "groups": ""}], ["wide_to_long", {"df": "", "stubnames": "", "i": "", "j": ""}], ],