From cbaf3b53207a06e1986377e98f5b20848c3f9788 Mon Sep 17 00:00:00 2001 From: sfc-gh-mvashishtha Date: Thu, 22 Aug 2024 16:20:12 -0700 Subject: [PATCH 1/4] SNOW-1636767, SNOW-1635405: Support timestamp +/- timedelta. Signed-off-by: sfc-gh-mvashishtha --- CHANGELOG.md | 1 + .../modin/plugin/_internal/binary_op_utils.py | 46 ++ .../modin/plugin/_internal/type_utils.py | 12 +- .../compiler/snowflake_query_compiler.py | 13 +- tests/integ/modin/binary/test_timedelta.py | 638 ++++++++++++++++-- tests/unit/modin/test_type_utils.py | 19 + 6 files changed, 644 insertions(+), 85 deletions(-) create mode 100644 tests/unit/modin/test_type_utils.py diff --git a/CHANGELOG.md b/CHANGELOG.md index afe0d3e862b..ca6ade63468 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ - `NotImplementedError` will be raised for the rest of methods that do not support `Timedelta`. - support for subtracting two timestamps to get a Timedelta. - support indexing with Timedelta data columns. + - support for adding or subtracting timestamps and `Timedelta`. - Added support for index's arithmetic and comparison operators. - Added support for `Series.dt.round`. - Added documentation pages for `DatetimeIndex`. diff --git a/src/snowflake/snowpark/modin/plugin/_internal/binary_op_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/binary_op_utils.py index 9f0c4717807..ed875af2ddf 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/binary_op_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/binary_op_utils.py @@ -13,6 +13,7 @@ from snowflake.snowpark.functions import ( col, concat, + dateadd, datediff, floor, iff, @@ -300,6 +301,29 @@ def compute_binary_op_between_snowpark_columns( ): # string/string case (only for add) binary_op_result_column = concat(first_operand, second_operand) + elif ( + op == "add" + and isinstance(first_datatype(), TimestampType) + and isinstance(second_datatype(), TimedeltaType) + ): + binary_op_result_column = dateadd("ns", second_operand, first_operand) + elif ( + op == "add" + and isinstance(first_datatype(), TimedeltaType) + and isinstance(second_datatype(), TimestampType) + ): + binary_op_result_column = dateadd("ns", first_operand, second_operand) + elif op == "add" and ( + ( + isinstance(first_datatype(), TimedeltaType) + and isinstance(second_datatype(), NullType) + ) + or ( + isinstance(first_datatype(), NullType) + and isinstance(second_datatype(), TimedeltaType) + ) + ): + return SnowparkPandasColumn(pandas_lit(None), TimedeltaType()) elif op == "mul" and ( ( isinstance(second_datatype(), _IntegralType) @@ -355,6 +379,28 @@ def compute_binary_op_between_snowpark_columns( second_operand=second_operand, second_datatype=second_datatype(), ) + elif ( + op == "sub" + and isinstance(first_datatype(), TimestampType) + and isinstance(second_datatype(), TimedeltaType) + ): + binary_op_result_column = dateadd("ns", -1 * second_operand, first_operand) + elif ( + op == "sub" + and isinstance(first_datatype(), TimedeltaType) + and isinstance(second_datatype(), TimestampType) + ): + # Timedelta - Timestamp doesn't make sense. Raise the same error + # message as pandas. + raise TypeError("bad operand type for unary -: 'DatetimeArray'") + elif isinstance(first_datatype(), TimedeltaType) or isinstance( + second_datatype(), TimedeltaType + ): + # We don't support these cases yet. + # TODO(SNOW-1637101, SNOW-1637102): Support these cases. + ErrorMessage.not_implemented( + f"Snowpark pandas does not yet support the binary operation {op} with timedelta types." + ) # If there is no special binary_op_result_column result, it means the operator and # the data type of the column don't need special handling. Then we get the overloaded # operator from Snowpark Column class, e.g., __add__ to perform binary operations. diff --git a/src/snowflake/snowpark/modin/plugin/_internal/type_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/type_utils.py index 1203d99b2f2..1fd252bf7e0 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/type_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/type_utils.py @@ -230,6 +230,12 @@ def to_snowflake( """ map a pandas or numpy type to snowpark data type. """ + snowpark_pandas_type = ( + SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type(p) + ) + if snowpark_pandas_type is not None: + return snowpark_pandas_type + if isinstance(p, DatetimeTZDtype): return TimestampType(TimestampTimeZone.TZ) if p is native_pd.Timestamp or is_datetime64_any_dtype(p): @@ -246,12 +252,6 @@ def to_snowflake( if is_float_dtype(p): return DoubleType() - snowpark_pandas_type = ( - SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type(p) - ) - if snowpark_pandas_type is not None: - return snowpark_pandas_type - try: return PANDAS_TO_SNOWFLAKE_MAP[p] except KeyError: diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index fc5ac819f9d..62aa74b4397 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -1895,8 +1895,6 @@ def _binary_op_list_like_rhs_axis_0( """ from snowflake.snowpark.modin.pandas.series import Series - self._raise_not_implemented_error_for_timedelta() - # Step 1: Convert other to a Series and join on the row position with self. other_qc = Series(other)._query_compiler self_frame = self._modin_frame.ensure_row_position_column() @@ -2047,8 +2045,6 @@ def binary_op( from snowflake.snowpark.modin.pandas.series import Series from snowflake.snowpark.modin.pandas.utils import is_scalar - self._raise_not_implemented_error_for_timedelta() - # fail explicitly for unsupported scenarios if level is not None: # TODO SNOW-862668: binary operations with level @@ -13746,8 +13742,6 @@ def _binary_op_between_dataframe_and_series_along_axis_0( Returns: SnowflakeQueryCompiler representing result of binary op operation. """ - self._raise_not_implemented_error_for_timedelta() - assert ( other.is_series_like() ), "other must be a Snowflake Query Compiler representing a Series" @@ -14319,7 +14313,12 @@ def infer_sorted_column_labels( data_column_snowflake_quoted_identifiers=expanded_data_column_snowflake_quoted_identifiers, index_column_pandas_labels=index_column_pandas_labels, index_column_snowflake_quoted_identifiers=frame.index_column_snowflake_quoted_identifiers, - data_column_types=None, + data_column_types=[ + frame.snowflake_quoted_identifier_to_snowpark_pandas_type.get( + identifier + ) + for identifier in expanded_data_column_snowflake_quoted_identifiers + ], index_column_types=None, ) diff --git a/tests/integ/modin/binary/test_timedelta.py b/tests/integ/modin/binary/test_timedelta.py index 72e3a4d75a9..705cd47f6f7 100644 --- a/tests/integ/modin/binary/test_timedelta.py +++ b/tests/integ/modin/binary/test_timedelta.py @@ -47,19 +47,115 @@ ) -class TestDataFrameAndScalar: - @pytest.mark.parametrize( - "scalar", +@pytest.fixture +def timestamp_no_timezone_dataframes_1() -> tuple[pd.DataFrame, native_pd.DataFrame]: + return create_test_dfs( [ - pd.NaT, - datetime.datetime(year=2024, month=8, day=14, hour=2, minute=32, second=42), - datetime.datetime(year=2023, month=3, day=14), - pd.Timestamp(year=2020, month=3, day=25), - ], + [ + pd.Timestamp(5, unit="ns"), + pd.Timestamp(700, unit="ns"), + pd.Timestamp(1399, unit="ns"), + ], + [ + pd.Timestamp(6, unit="ms"), + pd.Timestamp(800, unit="ms"), + pd.Timestamp(1499, unit="ms"), + ], + ] + ) + + +@pytest.fixture +def timestamp_no_timezone_series_2() -> tuple[pd.Series, native_pd.Series]: + return create_test_series( + [ + pd.Timestamp(5, unit="ns"), + pd.Timestamp(700, unit="ns"), + pd.Timestamp(1399, unit="ns"), + ] + ) + + +@pytest.fixture +def timestamp_no_timezone_dataframes_2() -> tuple[pd.DataFrame, native_pd.DataFrame]: + return create_test_dfs( + [ + [pd.Timestamp(5, unit="ns"), pd.Timestamp(700, unit="ns")], + [pd.Timestamp(6, unit="ns"), pd.Timestamp(800, unit="ns")], + [pd.Timestamp(7, unit="ns"), pd.Timestamp(900, unit="ns")], + ] + ) + + +@pytest.fixture +def timestamp_no_timezone_dataframes_3() -> tuple[pd.DataFrame, native_pd.DataFrame]: + return create_test_dfs( + [ + [pd.Timestamp(1, unit="ms"), pd.Timestamp(2, unit="ms")], + [pd.Timestamp(3, unit="ms"), pd.Timestamp(4, unit="ms")], + ] + ) + + +@pytest.fixture( + params=[ + pd.NaT, + datetime.datetime(year=2024, month=8, day=14, hour=2, minute=32, second=42), + datetime.datetime(year=2023, month=3, day=14), + pd.Timestamp(year=2020, month=3, day=25), + ], +) +def timestamp_scalar(request): + return request.param + + +@pytest.fixture( + params=[ + pd.Timedelta("10 days 23:59:59.123456789"), + pd.Timedelta("-10 days 23:59:59.123456789"), + datetime.timedelta(days=-10, hours=23), + datetime.timedelta(microseconds=1), + ] +) +def timedelta_scalar(request): + return request.param + + +@pytest.fixture +def timedelta_dataframes_1() -> tuple[pd.DataFrame, native_pd.DataFrame]: + return create_test_dfs( + [ + [pd.Timedelta(days=1), pd.NaT], + [ + pd.Timedelta(days=2), + pd.Timedelta(days=3), + ], + ] ) + + +@pytest.fixture +def timedelta_series_1() -> tuple[pd.Series, native_pd.Series]: + return create_test_series( + [ + None, + pd.Timedelta(days=1), + pd.Timedelta(days=2), + pd.Timedelta(days=3), + pd.Timedelta(days=4), + pd.Timedelta(days=5), + ] + ) + + +class TestDataFrameAndScalar: @pytest.mark.parametrize("operation", ["sub", "rsub"]) @sql_count_checker(query_count=1) - def test_timestamp_minus_timestamp(self, scalar, operation): + def test_timestamp_minus_timestamp( + self, + timestamp_scalar, + operation, + ): eval_snowpark_pandas_result( *create_test_dfs( [ @@ -70,23 +166,66 @@ def test_timestamp_minus_timestamp(self, scalar, operation): ], ] ), - lambda df: getattr(df, operation)(scalar), + lambda df: getattr(df, operation)(timestamp_scalar), ) - -class TestSeriesAndScalar: @pytest.mark.parametrize( - "scalar", + "operation", [ - pd.NaT, - datetime.datetime(year=2024, month=8, day=14, hour=2, minute=32, second=42), - datetime.datetime(year=2023, month=3, day=14), - pd.Timestamp(year=2020, month=3, day=25), + "add", + "radd", + "sub", ], ) + @sql_count_checker(query_count=1) + def test_timestamp_dataframe_plus_or_minus_timedelta_scalar( + self, timedelta_scalar, operation + ): + eval_snowpark_pandas_result( + *create_test_dfs( + [ + [datetime.datetime(year=2024, month=1, day=1), pd.NaT], + [ + datetime.datetime(year=2023, month=1, day=1), + datetime.datetime(year=2030, month=1, day=1), + ], + ] + ), + lambda df: getattr(df, operation)(timedelta_scalar), + ) + + @sql_count_checker(query_count=0) + def test_timedelta_scalar_minus_timestamp_dataframe_negative(self): + eval_snowpark_pandas_result( + *create_test_dfs([datetime.datetime(year=2024, month=8, day=21)]), + lambda df: pd.Timedelta(1) - df, + expect_exception=True, + expect_exception_type=TypeError, + ) + + @sql_count_checker(query_count=1) + @pytest.mark.parametrize("operation", ["add", "radd"]) + def test_timedelta_dataframe_plus_timestamp_scalar( + self, timedelta_dataframes_1, timestamp_scalar, operation + ): + eval_snowpark_pandas_result( + *timedelta_dataframes_1, lambda df: getattr(df, operation)(timestamp_scalar) + ) + + @pytest.mark.parametrize("operation", ["sub", "rsub", "add", "radd"]) + @pytest.mark.xfail(strict=True, raises=NotImplementedError, reason="SNOW-1637101") + def test_timedelta_dataframe_plus_or_minus_timedelta_scalar( + self, timedelta_dataframes_1, timedelta_scalar, operation + ): + eval_snowpark_pandas_result( + *timedelta_dataframes_1, lambda df: getattr(df, operation)(timedelta_scalar) + ) + + +class TestSeriesAndScalar: @sql_count_checker(query_count=1) @pytest.mark.parametrize("operation", ["sub", "rsub"]) - def test_timestamp_minus_timestamp(self, operation, scalar): + def test_timestamp_series_minus_timestamp_scalar(self, operation, timestamp_scalar): eval_snowpark_pandas_result( *create_test_series( [ @@ -96,29 +235,45 @@ def test_timestamp_minus_timestamp(self, operation, scalar): datetime.datetime(year=2030, month=1, day=1), ] ), - lambda series: getattr(series, operation)(scalar), + lambda series: getattr(series, operation)(timestamp_scalar), + ) + + @pytest.mark.parametrize( + "operation", + [ + "add", + "radd", + "sub", + ], + ) + @sql_count_checker(query_count=1) + def test_timestamp_series_plus_or_minus_timedelta_scalar( + self, timedelta_scalar, operation + ): + eval_snowpark_pandas_result( + *create_test_series(PANDAS_TIMESTAMP_SERIES_WITH_NULLS_NO_TIMEZONE_1[0]), + lambda series: getattr(series, operation)(timedelta_scalar), + ) + + @sql_count_checker(query_count=1) + @pytest.mark.parametrize("operation", ["add", "radd"]) + def test_timedelta_series_plus_timestamp_scalar( + self, timestamp_scalar, timedelta_series_1, operation + ): + eval_snowpark_pandas_result( + *timedelta_series_1, + lambda series: getattr(series, operation)(timestamp_scalar), ) class TestDataFrameAndListLikeAxis1: @pytest.mark.parametrize("op", ["sub", "rsub"]) @sql_count_checker(query_count=1) - def test_timestamp_minus_timestamp(self, op): + def test_timestamp_dataframe_minus_timestamp_list_like( + self, op, timestamp_no_timezone_dataframes_1 + ): eval_snowpark_pandas_result( - *create_test_dfs( - [ - [ - pd.Timestamp(5, unit="ns"), - pd.Timestamp(700, unit="ns"), - pd.Timestamp(1399, unit="ns"), - ], - [ - pd.Timestamp(6, unit="ms"), - pd.Timestamp(800, unit="ms"), - pd.Timestamp(1499, unit="ms"), - ], - ] - ), + *timestamp_no_timezone_dataframes_1, lambda df: getattr(df, op)( [ pd.Timestamp(1, unit="ns"), @@ -128,19 +283,46 @@ def test_timestamp_minus_timestamp(self, op): ), ) + @sql_count_checker(query_count=1) + @pytest.mark.parametrize("op", ["add", "radd", "sub"]) + def test_timestamp_dataframe_plus_or_minus_timedelta_list_like( + self, op, timestamp_no_timezone_dataframes_1 + ): + eval_snowpark_pandas_result( + *timestamp_no_timezone_dataframes_1, + lambda df: getattr(df, op)( + [ + pd.Timedelta(1, unit="ns"), + pd.Timedelta(300, unit="ns"), + pd.Timedelta(57, unit="ms"), + ] + ), + ) + + @sql_count_checker(query_count=1) + @pytest.mark.parametrize("op", ["add", "radd"]) + def test_timedelta_dataframe_plus_timestamp_list_like( + self, op, timedelta_dataframes_1 + ): + eval_snowpark_pandas_result( + *timedelta_dataframes_1, + lambda df: getattr(df, op)( + [ + pd.Timestamp(1, unit="D"), + pd.Timestamp(-13, unit="D"), + ] + ), + ) + class TestSeriesAndListLike: @sql_count_checker(query_count=1, join_count=1) @pytest.mark.parametrize("op", ["sub", "rsub"]) - def test_timestamp_minus_timestamp(self, op): + def test_timestamp_series_minus_timestamp_list_like( + self, op, timestamp_no_timezone_series_2 + ): eval_snowpark_pandas_result( - *create_test_series( - [ - pd.Timestamp(5, unit="ns"), - pd.Timestamp(700, unit="ns"), - pd.Timestamp(1399, unit="ns"), - ] - ), + *timestamp_no_timezone_series_2, lambda series: getattr(series, op)( [ pd.Timestamp(1, unit="ns"), @@ -150,19 +332,48 @@ def test_timestamp_minus_timestamp(self, op): ), ) + @sql_count_checker(query_count=1, join_count=1) + @pytest.mark.parametrize("op", ["add", "radd", "sub"]) + def test_timestamp_series_plus_or_minus_timedelta_list_like( + self, op, timestamp_no_timezone_series_2 + ): + eval_snowpark_pandas_result( + *timestamp_no_timezone_series_2, + lambda series: getattr(series, op)( + [ + pd.Timedelta(1, unit="ns"), + pd.Timedelta(300, unit="ns"), + pd.Timedelta(57, unit="ms"), + ] + ), + ) -class TestDataFrameAndListLikeAxis0: @sql_count_checker(query_count=1, join_count=1) - @pytest.mark.parametrize("op", ["sub", "rsub"]) - def test_timestamp_minus_timestamp(self, op): + @pytest.mark.parametrize("op", ["add", "radd"]) + def test_timedelta_series_plus_timestamp_list_like(self, op, timedelta_series_1): eval_snowpark_pandas_result( - *create_test_dfs( + *timedelta_series_1, + lambda series: getattr(series, op)( [ - [pd.Timestamp(5, unit="ns"), pd.Timestamp(700, unit="ns")], - [pd.Timestamp(6, unit="ns"), pd.Timestamp(800, unit="ns")], - [pd.Timestamp(7, unit="ns"), pd.Timestamp(900, unit="ns")], + pd.Timestamp(1, unit="D"), + pd.Timestamp(2, unit="D"), + pd.NaT, + pd.Timestamp(3, unit="D"), + pd.Timestamp(4, unit="D"), + pd.Timestamp(5, unit="D"), ] ), + ) + + +class TestDataFrameAndListLikeAxis0: + @sql_count_checker(query_count=1, join_count=1) + @pytest.mark.parametrize("op", ["sub", "rsub"]) + def test_timestamp_dataframe_minus_timestamp_list_like( + self, op, timestamp_no_timezone_dataframes_2 + ): + eval_snowpark_pandas_result( + *timestamp_no_timezone_dataframes_2, lambda df: getattr(df, op)( [ pd.Timestamp(1, unit="ns"), @@ -173,6 +384,39 @@ def test_timestamp_minus_timestamp(self, op): ), ) + @pytest.mark.parametrize("op", ["add", "radd", "sub"]) + @sql_count_checker(query_count=1, join_count=1) + def test_timestamp_dataframe_plus_or_minus_timedelta_list_like( + self, op, timestamp_no_timezone_dataframes_2 + ): + eval_snowpark_pandas_result( + *timestamp_no_timezone_dataframes_2, + lambda df: getattr(df, op)( + [ + pd.Timedelta(1, unit="ns"), + pd.Timedelta(300, unit="ns"), + pd.Timedelta(999, unit="ns"), + ], + axis=0, + ), + ) + + @sql_count_checker(query_count=1, join_count=1) + @pytest.mark.parametrize("op", ["add", "radd"]) + def test_timedelta_dataframe_plus_timestamp_list_like( + self, op, timedelta_dataframes_1 + ): + eval_snowpark_pandas_result( + *timedelta_dataframes_1, + lambda df: getattr(df, op)( + [ + pd.Timestamp(1, unit="D"), + pd.Timestamp(999, unit="D"), + ], + axis=0, + ), + ) + class TestSeriesAndSeries: @pytest.mark.parametrize( @@ -243,17 +487,38 @@ def test_subtract_two_timestamps_timezones_disallowed( except_exception_type=TypeError, ) + @pytest.mark.parametrize("op", ["add", "radd", "sub"]) + @sql_count_checker(query_count=1, join_count=1) + def test_timestamp_plus_or_minus_timedelta(self, op, timedelta_series_1): + pandas_lhs = PANDAS_TIMESTAMP_SERIES_WITH_NULLS_NO_TIMEZONE_1[0] + snow_lhs = pd.Series(pandas_lhs) + snow_rhs, pandas_rhs = timedelta_series_1 + eval_snowpark_pandas_result( + (snow_lhs, snow_rhs), + (pandas_lhs, pandas_rhs), + lambda inputs: getattr(inputs[0], op)(inputs[1]), + ) + + @pytest.mark.parametrize("op", ["add", "radd"]) + @sql_count_checker(query_count=1, join_count=1) + def test_timedelta_plus_timestamp(self, op, timedelta_series_1): + snow_lhs, pandas_lhs = timedelta_series_1 + pandas_rhs = PANDAS_TIMESTAMP_SERIES_WITH_NULLS_NO_TIMEZONE_1[0] + snow_rhs = pd.Series(pandas_rhs) + eval_snowpark_pandas_result( + (snow_lhs, snow_rhs), + (pandas_lhs, pandas_rhs), + lambda inputs: getattr(inputs[0], op)(inputs[1]), + ) + class TestDataFrameAndSeriesAxis0: @pytest.mark.parametrize("op", ["sub", "rsub"]) @sql_count_checker(query_count=1, join_count=1) - def test_timestamp_dataframe_minus_timestamp_series(self, op): - snow_df, pandas_df = create_test_dfs( - [ - [pd.Timestamp(1, unit="ms"), pd.Timestamp(2, unit="ms")], - [pd.Timestamp(3, unit="ms"), pd.Timestamp(4, unit="ms")], - ] - ) + def test_timestamp_dataframe_minus_timestamp_series( + self, op, timestamp_no_timezone_dataframes_3 + ): + snow_df, pandas_df = timestamp_no_timezone_dataframes_3 snow_series, pandas_series = create_test_series( [ pd.Timestamp(5, unit="ms"), @@ -267,6 +532,50 @@ def test_timestamp_dataframe_minus_timestamp_series(self, op): lambda t: getattr(t[0], op)(t[1], axis=0), ) + @pytest.mark.parametrize("op", ["add", "radd", "sub"]) + @sql_count_checker(query_count=1, join_count=1) + def test_timestamp_dataframe_plus_or_minus_timedelta_series( + self, op, timestamp_no_timezone_dataframes_3 + ): + snow_df, pandas_df = timestamp_no_timezone_dataframes_3 + snow_series, pandas_series = create_test_series( + [ + pd.Timedelta(5, unit="ms"), + pd.Timedelta(6, unit="ms"), + pd.Timedelta(7, unit="ms"), + ] + ) + eval_snowpark_pandas_result( + (snow_df, snow_series), + (pandas_df, pandas_series), + lambda t: getattr(t[0], op)(t[1], axis=0), + ) + + @pytest.mark.parametrize( + "op", + [ + "add", + "radd", + ], + ) + @sql_count_checker(query_count=1, join_count=1) + def test_timedelta_dataframe_plus_timestamp_series( + self, op, timedelta_dataframes_1 + ): + snow_df, pandas_df = timedelta_dataframes_1 + snow_series, pandas_series = create_test_series( + [ + pd.Timestamp(5, unit="D"), + pd.Timestamp(6, unit="D"), + pd.Timestamp(7, unit="D"), + ] + ) + eval_snowpark_pandas_result( + (snow_df, snow_series), + (pandas_df, pandas_series), + lambda t: getattr(t[0], op)(t[1], axis=0), + ) + class TestDataFrameAndSeriesAxis1: @sql_count_checker( @@ -274,17 +583,14 @@ class TestDataFrameAndSeriesAxis1: # query to materialize the result. query_count=2 ) - def test_timestamp_dataframe_minus_timestamp_series(self): + def test_timestamp_dataframe_minus_timestamp_series( + self, timestamp_no_timezone_dataframes_3 + ): """ Test subtracting a series of timestamps from a dataframe of timestamps on axis 1. pandas behavior is incorrect: https://github.com/pandas-dev/pandas/issues/59529 """ - pandas_df = native_pd.DataFrame( - [ - [pd.Timestamp(1, unit="ms"), pd.Timestamp(2, unit="ms")], - [pd.Timestamp(3, unit="ms"), pd.Timestamp(4, unit="ms")], - ] - ) + snow_df, pandas_df = timestamp_no_timezone_dataframes_3 pandas_series = native_pd.Series( [ pd.Timestamp(5, unit="ms"), @@ -297,7 +603,7 @@ def test_timestamp_dataframe_minus_timestamp_series(self): ): pandas_df - pandas_series assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( - pd.DataFrame(pandas_df) - pd.Series(pandas_series), + snow_df - pd.Series(pandas_series), native_pd.DataFrame( [ [ @@ -319,17 +625,14 @@ def test_timestamp_dataframe_minus_timestamp_series(self): # query to materialize the result. query_count=2 ) - def test_timestamp_series_minus_timestamp_dataframe(self): + def test_timestamp_series_minus_timestamp_dataframe( + self, timestamp_no_timezone_dataframes_3 + ): """ Test subtracting a dataframe of timestamps from a series of timestamps. pandas behavior is incorrect: https://github.com/pandas-dev/pandas/issues/59529 """ - pandas_df = native_pd.DataFrame( - [ - [pd.Timestamp(1, unit="ms"), pd.Timestamp(2, unit="ms")], - [pd.Timestamp(3, unit="ms"), pd.Timestamp(4, unit="ms")], - ] - ) + snow_df, pandas_df = timestamp_no_timezone_dataframes_3 pandas_series = native_pd.Series( [ pd.Timestamp(5, unit="ms"), @@ -345,7 +648,7 @@ def test_timestamp_series_minus_timestamp_dataframe(self): ): pandas_series - pandas_df assert_snowpark_pandas_equals_to_pandas_without_dtypecheck( - pd.Series(pandas_series) - pd.DataFrame(pandas_df), + pd.Series(pandas_series) - snow_df, native_pd.DataFrame( [ [ @@ -362,6 +665,149 @@ def test_timestamp_series_minus_timestamp_dataframe(self): ), ) + @sql_count_checker( + # One query to materialize the series for the subtraction, and another + # query to materialize the result. + query_count=2 + ) + @pytest.mark.parametrize( + "op,error_message", + [ + pytest.param( + "add", + "ufunc 'add' cannot use operands with types dtype('float64') and dtype(' Date: Fri, 23 Aug 2024 12:19:15 -0700 Subject: [PATCH 2/4] Respond to comments Signed-off-by: sfc-gh-mvashishtha --- .../snowpark/modin/plugin/_internal/binary_op_utils.py | 8 ++++---- tests/integ/modin/binary/test_timedelta.py | 3 +++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/binary_op_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/binary_op_utils.py index ed875af2ddf..583bbcdde14 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/binary_op_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/binary_op_utils.py @@ -303,8 +303,8 @@ def compute_binary_op_between_snowpark_columns( binary_op_result_column = concat(first_operand, second_operand) elif ( op == "add" - and isinstance(first_datatype(), TimestampType) and isinstance(second_datatype(), TimedeltaType) + and isinstance(first_datatype(), TimestampType) ): binary_op_result_column = dateadd("ns", second_operand, first_operand) elif ( @@ -319,8 +319,8 @@ def compute_binary_op_between_snowpark_columns( and isinstance(second_datatype(), NullType) ) or ( - isinstance(first_datatype(), NullType) - and isinstance(second_datatype(), TimedeltaType) + isinstance(second_datatype(), TimedeltaType) + and isinstance(first_datatype(), NullType) ) ): return SnowparkPandasColumn(pandas_lit(None), TimedeltaType()) @@ -381,8 +381,8 @@ def compute_binary_op_between_snowpark_columns( ) elif ( op == "sub" - and isinstance(first_datatype(), TimestampType) and isinstance(second_datatype(), TimedeltaType) + and isinstance(first_datatype(), TimestampType) ): binary_op_result_column = dateadd("ns", -1 * second_operand, first_operand) elif ( diff --git a/tests/integ/modin/binary/test_timedelta.py b/tests/integ/modin/binary/test_timedelta.py index 705cd47f6f7..494d3e0c39f 100644 --- a/tests/integ/modin/binary/test_timedelta.py +++ b/tests/integ/modin/binary/test_timedelta.py @@ -201,6 +201,9 @@ def test_timedelta_scalar_minus_timestamp_dataframe_negative(self): lambda df: pd.Timedelta(1) - df, expect_exception=True, expect_exception_type=TypeError, + expect_exception_match=re.escape( + "bad operand type for unary -: 'DatetimeArray" + ), ) @sql_count_checker(query_count=1) From cf1f4eb9f94e531c27dc6c164c12872c3d935078 Mon Sep 17 00:00:00 2001 From: Mahesh Vashishtha Date: Fri, 23 Aug 2024 12:19:32 -0700 Subject: [PATCH 3/4] Update tests/integ/modin/binary/test_timedelta.py Co-authored-by: Andong Zhan --- tests/integ/modin/binary/test_timedelta.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integ/modin/binary/test_timedelta.py b/tests/integ/modin/binary/test_timedelta.py index 494d3e0c39f..632243664d7 100644 --- a/tests/integ/modin/binary/test_timedelta.py +++ b/tests/integ/modin/binary/test_timedelta.py @@ -151,7 +151,7 @@ def timedelta_series_1() -> tuple[pd.Series, native_pd.Series]: class TestDataFrameAndScalar: @pytest.mark.parametrize("operation", ["sub", "rsub"]) @sql_count_checker(query_count=1) - def test_timestamp_minus_timestamp( + def test_timestamp_col_minus_timestamp_scalar( self, timestamp_scalar, operation, From 7418153bd2600cd5d651b00b0e6052ca098c1459 Mon Sep 17 00:00:00 2001 From: sfc-gh-mvashishtha Date: Fri, 23 Aug 2024 14:42:02 -0700 Subject: [PATCH 4/4] Raise NotImplementedError for Timedelta / Timedelta Signed-off-by: sfc-gh-mvashishtha --- .../modin/plugin/_internal/binary_op_utils.py | 92 +++++++++---------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/binary_op_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/binary_op_utils.py index 583bbcdde14..fe475a49dd5 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/binary_op_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/binary_op_utils.py @@ -278,30 +278,7 @@ def compute_binary_op_between_snowpark_columns( # some operators and the data types have to be handled specially to align with pandas # However, it is difficult to fail early if the arithmetic operator is not compatible # with the data type, so we just let the server raise exception (e.g. a string minus a string). - if op == "truediv": - binary_op_result_column = first_operand / second_operand - elif op == "floordiv": - binary_op_result_column = floor(first_operand / second_operand) - elif op == "mod": - binary_op_result_column = compute_modulo_between_snowpark_columns( - first_operand, first_datatype(), second_operand, second_datatype() - ) - elif op == "pow": - binary_op_result_column = compute_power_between_snowpark_columns( - first_operand, second_operand - ) - elif op in ["__or__", "__ror__"]: - binary_op_result_column = first_operand | second_operand - elif op in ["__and__", "__rand__"]: - binary_op_result_column = first_operand & second_operand - elif ( - op == "add" - and isinstance(second_datatype(), StringType) - and isinstance(first_datatype(), StringType) - ): - # string/string case (only for add) - binary_op_result_column = concat(first_operand, second_operand) - elif ( + if ( op == "add" and isinstance(second_datatype(), TimedeltaType) and isinstance(first_datatype(), TimestampType) @@ -324,6 +301,51 @@ def compute_binary_op_between_snowpark_columns( ) ): return SnowparkPandasColumn(pandas_lit(None), TimedeltaType()) + elif ( + op == "sub" + and isinstance(second_datatype(), TimedeltaType) + and isinstance(first_datatype(), TimestampType) + ): + binary_op_result_column = dateadd("ns", -1 * second_operand, first_operand) + elif ( + op == "sub" + and isinstance(first_datatype(), TimedeltaType) + and isinstance(second_datatype(), TimestampType) + ): + # Timedelta - Timestamp doesn't make sense. Raise the same error + # message as pandas. + raise TypeError("bad operand type for unary -: 'DatetimeArray'") + elif isinstance(first_datatype(), TimedeltaType) or isinstance( + second_datatype(), TimedeltaType + ): + # We don't support these cases yet. + # TODO(SNOW-1637101, SNOW-1637102): Support these cases. + ErrorMessage.not_implemented( + f"Snowpark pandas does not yet support the binary operation {op} with timedelta types." + ) + elif op == "truediv": + binary_op_result_column = first_operand / second_operand + elif op == "floordiv": + binary_op_result_column = floor(first_operand / second_operand) + elif op == "mod": + binary_op_result_column = compute_modulo_between_snowpark_columns( + first_operand, first_datatype(), second_operand, second_datatype() + ) + elif op == "pow": + binary_op_result_column = compute_power_between_snowpark_columns( + first_operand, second_operand + ) + elif op in ["__or__", "__ror__"]: + binary_op_result_column = first_operand | second_operand + elif op in ["__and__", "__rand__"]: + binary_op_result_column = first_operand & second_operand + elif ( + op == "add" + and isinstance(second_datatype(), StringType) + and isinstance(first_datatype(), StringType) + ): + # string/string case (only for add) + binary_op_result_column = concat(first_operand, second_operand) elif op == "mul" and ( ( isinstance(second_datatype(), _IntegralType) @@ -379,28 +401,6 @@ def compute_binary_op_between_snowpark_columns( second_operand=second_operand, second_datatype=second_datatype(), ) - elif ( - op == "sub" - and isinstance(second_datatype(), TimedeltaType) - and isinstance(first_datatype(), TimestampType) - ): - binary_op_result_column = dateadd("ns", -1 * second_operand, first_operand) - elif ( - op == "sub" - and isinstance(first_datatype(), TimedeltaType) - and isinstance(second_datatype(), TimestampType) - ): - # Timedelta - Timestamp doesn't make sense. Raise the same error - # message as pandas. - raise TypeError("bad operand type for unary -: 'DatetimeArray'") - elif isinstance(first_datatype(), TimedeltaType) or isinstance( - second_datatype(), TimedeltaType - ): - # We don't support these cases yet. - # TODO(SNOW-1637101, SNOW-1637102): Support these cases. - ErrorMessage.not_implemented( - f"Snowpark pandas does not yet support the binary operation {op} with timedelta types." - ) # If there is no special binary_op_result_column result, it means the operator and # the data type of the column don't need special handling. Then we get the overloaded # operator from Snowpark Column class, e.g., __add__ to perform binary operations.