From d2d3e97b51c3c51779ba374aa62b4cd70be05264 Mon Sep 17 00:00:00 2001 From: Naren Krishna Date: Fri, 20 Sep 2024 13:48:19 -0700 Subject: [PATCH] SNOW-1023211: Support resample with rule offset `W`, `ME`, `YE` with `closed = left` (#2254) SNOW-1023211 This PR adds support for `resample` by rule offset `W`, `ME`, `YE` with parameter `closed = left`. Note: Snowpark pandas `resample` implementation uses the Snowflake SQL function `TIME_SLICE` which assumes the left hand side of the given interval is closed, meaning support using `closed = right` parameter is non-trivial. --------- Signed-off-by: Naren Krishna --- CHANGELOG.md | 1 + .../modin/supported/dataframe_supported.rst | 14 ++-- .../modin/supported/series_supported.rst | 14 ++-- .../modin/plugin/_internal/resample_utils.py | 65 +++++++++++++------ .../compiler/snowflake_query_compiler.py | 58 ++++++++++++++--- tests/integ/modin/resample/test_resample.py | 6 +- .../modin/resample/test_resample_asfreq.py | 20 ++++-- .../modin/resample/test_resample_negative.py | 14 ++++ 8 files changed, 143 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2080b6af1e5..924fc3c2a5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ - Added support for passing parameter `include_describe` to `Session.query_history`. - Added support for `DatetimeIndex.mean` and `DatetimeIndex.std` methods. - Added support for `Resampler.asfreq`. +- Added support for `resample` frequency `W`, `ME`, `YE` with `closed = "left"`. #### Bug Fixes diff --git a/docs/source/modin/supported/dataframe_supported.rst b/docs/source/modin/supported/dataframe_supported.rst index 54858063e54..a63fc4ee5ba 100644 --- a/docs/source/modin/supported/dataframe_supported.rst +++ b/docs/source/modin/supported/dataframe_supported.rst @@ -91,8 +91,9 @@ Methods +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``applymap`` | P | | ``N`` if ``na_action == "ignore"`` | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ -| ``asfreq`` | P | ``how``, ``normalize``, | See ``resample`` | -| | | ``fill_value`` | | +| ``asfreq`` | P | ``how``, ``normalize``, | Only DatetimeIndex is supported and its ``freq`` | +| | | ``fill_value`` | will be lost. Only ``rule`` frequencies 's', 'min',| +| | | | 'h', and 'D' are supported. | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``asof`` | N | | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ @@ -347,10 +348,11 @@ Methods | ``replace`` | P | ``copy`` is ignored, ``method``, | | | | | ``limit`` | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ -| ``resample`` | P | ``axis``, ``closed``, ``label``, | Only DatetimeIndex is supported and its ``freq`` | -| | | ``convention``, ``kind``, ``on`` | will be lost. Only ``rule`` frequencies 's', 'min',| -| | | , ``level``, ``origin``, | 'h', and 'D' are supported. | -| | | , ``offset``, ``group_keys`` | | +| ``resample`` | P | ``axis``, ``label``, | Only DatetimeIndex is supported and its ``freq`` | +| | | ``convention``, ``kind``, ``on`` | will be lost. ``rule`` frequencies 's', 'min', | +| | | , ``level``, ``origin``, | 'h', and 'D' are supported. ``rule`` frequencies | +| | | , ``offset``, ``group_keys`` | 'W', 'ME', and 'YE' are supported with | +| | | | `closed = "left"` | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``reset_index`` | Y | | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ diff --git a/docs/source/modin/supported/series_supported.rst b/docs/source/modin/supported/series_supported.rst index 618b88d5034..c6e93e690ba 100644 --- a/docs/source/modin/supported/series_supported.rst +++ b/docs/source/modin/supported/series_supported.rst @@ -100,8 +100,9 @@ Methods +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``argsort`` | N | | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ -| ``asfreq`` | P | ``how``, ``normalize``, | See ``resample`` | -| | | ``fill_value`` | | +| ``asfreq`` | P | ``how``, ``normalize``, | Only DatetimeIndex is supported and its ``freq`` | +| | | ``fill_value`` | will be lost. Only ``rule`` frequencies 's', 'min',| +| | | | 'h', and 'D' are supported. | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``asof`` | N | | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ @@ -340,10 +341,11 @@ Methods +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``replace`` | P | ``method``, ``limit`` | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ -| ``resample`` | P | ``axis``, ``closed``, ``label``, | Only DatetimeIndex is supported and its ``freq`` | -| | | ``convention``, ``kind``, ``on`` | will be lost. Only ``rule`` frequencies 's', 'min',| -| | | , ``level``, ``origin``, | 'h', and 'D' are supported. | -| | | , ``offset``, ``group_keys`` | | +| ``resample`` | P | ``axis``, ``label``, | Only DatetimeIndex is supported and its ``freq`` | +| | | ``convention``, ``kind``, ``on`` | will be lost. ``rule`` frequencies 's', 'min', | +| | | , ``level``, ``origin``, | 'h', and 'D' are supported. ``rule`` frequencies | +| | | , ``offset``, ``group_keys`` | 'W', 'ME', and 'YE' are supported with | +| | | | `closed = "left"` | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ | ``reset_index`` | Y | | | +-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+ diff --git a/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py index ba8ceedec5e..4ebfba1d910 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py @@ -4,7 +4,6 @@ from typing import Any, Literal, NoReturn, Optional, Union -import pandas as native_pd from pandas._libs.lib import no_default from pandas._libs.tslibs import to_offset from pandas._typing import Frequency @@ -16,6 +15,7 @@ builtin, dateadd, datediff, + last_day, lit, to_timestamp_ntz, ) @@ -47,7 +47,9 @@ "last", ] IMPLEMENTED_MISC_METHODS = ["ffill"] -SUPPORTED_RESAMPLE_RULES = ["day", "hour", "second", "minute"] +SUPPORTED_RESAMPLE_RULES = ("second", "minute", "hour", "day", "week", "month", "year") +RULE_SECOND_TO_DAY = ("second", "minute", "hour", "day") +RULE_WEEK_TO_YEAR = ("week", "quarter", "month", "year") # https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#dateoffset-objects @@ -82,9 +84,21 @@ "ns", ] -SNOWFLAKE_SUPPORTED_DATEOFFSETS = ["W", "ME", "QE", "QS", "YS", "D", "h", "min", "s"] +SNOWFLAKE_SUPPORTED_DATEOFFSETS = [ + "s", + "min", + "h", + "D", + "W", + "MS", + "ME", + "QS", + "QE", + "YS", + "YE", +] -IMPLEMENTED_DATEOFFSET_STRINGS = ["min", "s", "h", "D"] +IMPLEMENTED_DATEOFFSET_STRINGS = ["s", "min", "h", "D", "W", "ME", "YE"] UNSUPPORTED_DATEOFFSET_STRINGS = list( # sort so that tests that generate test cases from this last always use the @@ -134,7 +148,6 @@ def rule_to_snowflake_width_and_slice_unit(rule: Frequency) -> tuple[int, str]: rule_code = offset.rule_code slice_width = offset.n - if rule_code == "s": slice_unit = "second" elif rule_code == "min": @@ -143,16 +156,16 @@ def rule_to_snowflake_width_and_slice_unit(rule: Frequency) -> tuple[int, str]: slice_unit = "hour" elif rule_code == "D": slice_unit = "day" - elif rule_code[0] == "W": # pragma: no cover + elif rule_code[0] == "W": # treat codes like W-MON and W-SUN as "week": slice_unit = "week" - elif rule_code == "ME": # pragma: no cover + elif rule_code == "ME": slice_unit = "month" - elif rule_code[0] == "QE": # pragma: no cover - # treat codes like Q-DEC and Q-JAN as "quarter": + elif rule_code[0:2] == "QE": # pragma: no cover + # treat codes like QE-DEC and QE-JAN as "quarter": slice_unit = "quarter" - elif rule_code[0] == "YE": # pragma: no cover - # treat codes like A-DEC and A-JAN as "year": + elif rule_code[0:2] == "YE": + # treat codes like YE-DEC and YE-JAN as "year": slice_unit = "year" else: raise NotImplementedError( @@ -204,9 +217,7 @@ def validate_resample_supported_by_snowflake( """ rule = resample_kwargs.get("rule") - _, slice_unit = rule_to_snowflake_width_and_slice_unit( - rule # type: ignore[arg-type] - ) + _, slice_unit = rule_to_snowflake_width_and_slice_unit(rule) if slice_unit not in SUPPORTED_RESAMPLE_RULES: _argument_not_implemented("rule", rule) @@ -216,8 +227,13 @@ def validate_resample_supported_by_snowflake( _argument_not_implemented("axis", axis) closed = resample_kwargs.get("closed") - if closed is not None: # pragma: no cover + if closed not in ("left", None) and slice_unit in RULE_SECOND_TO_DAY: _argument_not_implemented("closed", closed) + if slice_unit in RULE_WEEK_TO_YEAR: + if closed != "left": + ErrorMessage.not_implemented( + f"resample with rule offset {rule} is only implemented with closed='left'" + ) label = resample_kwargs.get("label") if label is not None: # pragma: no cover @@ -376,8 +392,7 @@ def perform_resample_binning_on_frame( # Time slices in Snowflake are aligned to snowflake_timeslice_alignment_date, # so we must normalize input datetimes. normalization_amt = ( - native_pd.to_datetime(start_date) - - native_pd.to_datetime(SNOWFLAKE_TIMESLICE_ALIGNMENT_DATE) + pd.to_datetime(start_date) - pd.to_datetime(SNOWFLAKE_TIMESLICE_ALIGNMENT_DATE) ).total_seconds() # Subtract the normalization amount in seconds from the input datetime. @@ -399,7 +414,12 @@ def perform_resample_binning_on_frame( # Call time_slice on the normalized datetime column with the slice_width and slice_unit. # time_slice is not supported for timestamps with timezones, only TIMESTAMP_NTZ - normalized_dates_set_to_bins = time_slice(normalized_dates, slice_width, slice_unit) + normalized_dates_set_to_bins = time_slice( + column=normalized_dates, + slice_length=slice_width, + date_or_time_part=slice_unit, + start_or_end="start" if slice_unit in RULE_SECOND_TO_DAY else "end", + ) # frame: # data_col # date @@ -414,8 +434,13 @@ def perform_resample_binning_on_frame( # 1970-01-10 9 # Add the normalization amount in seconds back to the input datetime for the correct result. - unnormalized_dates_set_to_bins = dateadd( - "second", lit(normalization_amt), normalized_dates_set_to_bins + unnormalized_dates_set_to_bins = ( + dateadd("second", lit(normalization_amt), normalized_dates_set_to_bins) + if slice_unit in RULE_SECOND_TO_DAY + else last_day( + dateadd("second", lit(normalization_amt), normalized_dates_set_to_bins), + slice_unit, + ) ) # frame: # data_col diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index f54c3e5a88c..97d0c9c3ac6 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -105,6 +105,7 @@ is_char, is_null, lag, + last_day, last_value, lead, least, @@ -255,6 +256,8 @@ ) from snowflake.snowpark.modin.plugin._internal.resample_utils import ( IMPLEMENTED_AGG_METHODS, + RULE_SECOND_TO_DAY, + RULE_WEEK_TO_YEAR, fill_missing_resample_bins_for_frame, get_expected_resample_bins_frame, get_snowflake_quoted_identifier_for_resample_index_col, @@ -11900,6 +11903,12 @@ def asfreq( "Snowpark pandas `asfreq` does not support parameters `how`, `normalize`, or `fill_value`." ) + _, slice_unit = rule_to_snowflake_width_and_slice_unit(freq) + if slice_unit not in RULE_SECOND_TO_DAY: + ErrorMessage.not_implemented( + "Snowpark pandas `asfreq` does not yet support frequencies week, month, quarter, or year" + ) + resample_kwargs = { "rule": freq, "axis": 0, @@ -11974,7 +11983,7 @@ def resample( rule = resample_kwargs.get("rule") - _, slice_unit = rule_to_snowflake_width_and_slice_unit(rule) + slice_width, slice_unit = rule_to_snowflake_width_and_slice_unit(rule) min_max_index_column_quoted_identifier = ( frame.ordered_dataframe.generate_snowflake_quoted_identifiers( @@ -11990,14 +11999,45 @@ def resample( # For instance, if rule='3D' and the earliest date is # 2020-03-01 1:00:00, the first date should be 2020-03-01, # which is what date_trunc gives us. - start_date, end_date = frame.ordered_dataframe.agg( - date_trunc(slice_unit, min_(snowflake_index_column_identifier)).as_( - min_max_index_column_quoted_identifier[0] - ), - date_trunc(slice_unit, max_(snowflake_index_column_identifier)).as_( - min_max_index_column_quoted_identifier[1] - ), - ).collect()[0] + if slice_unit in RULE_SECOND_TO_DAY: + # `slice_unit` in 'second', 'minute', 'hour', 'day' + start_date, end_date = frame.ordered_dataframe.agg( + date_trunc(slice_unit, min_(snowflake_index_column_identifier)).as_( + min_max_index_column_quoted_identifier[0] + ), + date_trunc(slice_unit, max_(snowflake_index_column_identifier)).as_( + min_max_index_column_quoted_identifier[1] + ), + ).collect()[0] + else: + assert slice_unit in RULE_WEEK_TO_YEAR + # `slice_unit` in 'week', 'month', 'quarter', or 'year'. Set the start and end dates + # to the last day of the given `slice_unit`. Use the right bin edge by adding a `slice_width` + # of the given `slice_unit` to the first and last date of the index. + start_date, end_date = frame.ordered_dataframe.agg( + last_day( + date_trunc( + slice_unit, + dateadd( + slice_unit, + pandas_lit(slice_width), + min_(snowflake_index_column_identifier), + ), + ), + slice_unit, + ).as_(min_max_index_column_quoted_identifier[0]), + last_day( + date_trunc( + slice_unit, + dateadd( + slice_unit, + pandas_lit(slice_width), + max_(snowflake_index_column_identifier), + ), + ), + slice_unit, + ).as_(min_max_index_column_quoted_identifier[1]), + ).collect()[0] if resample_method in ("ffill", "bfill"): expected_frame = get_expected_resample_bins_frame( diff --git a/tests/integ/modin/resample/test_resample.py b/tests/integ/modin/resample/test_resample.py index 63c72452c1c..4132267c726 100644 --- a/tests/integ/modin/resample/test_resample.py +++ b/tests/integ/modin/resample/test_resample.py @@ -41,7 +41,7 @@ def test_resample_with_varying_freq_and_interval(freq, interval, agg_func): {"A": np.random.randn(15)}, index=native_pd.date_range("2020-01-01", periods=15, freq=f"1{freq}"), ), - lambda df: getattr(df.resample(rule=rule), agg_func)(), + lambda df: getattr(df.resample(rule=rule, closed="left"), agg_func)(), check_freq=False, ) @@ -102,7 +102,7 @@ def test_resample_missing_data_upsample(agg_func, freq): rule = f"1{freq}" eval_snowpark_pandas_result( *create_test_dfs({"A": np.random.randn(10)}, index=date_data), - lambda df: getattr(df.resample(rule=rule), agg_func)(), + lambda df: getattr(df.resample(rule=rule, closed="left"), agg_func)(), check_freq=False, ) @@ -159,7 +159,7 @@ def test_resample_series(freq, interval, agg_func): range(15), index=native_pd.date_range("2020-01-01", periods=15, freq=f"1{freq}"), ), - lambda ser: getattr(ser.resample(rule=rule), agg_func)(), + lambda ser: getattr(ser.resample(rule=rule, closed="left"), agg_func)(), check_freq=False, ) diff --git a/tests/integ/modin/resample/test_resample_asfreq.py b/tests/integ/modin/resample/test_resample_asfreq.py index b3e69993e5e..e04eafe6010 100644 --- a/tests/integ/modin/resample/test_resample_asfreq.py +++ b/tests/integ/modin/resample/test_resample_asfreq.py @@ -7,13 +7,10 @@ import pandas as native_pd import pytest -from snowflake.snowpark.modin.plugin._internal.resample_utils import ( - IMPLEMENTED_DATEOFFSET_STRINGS, -) from tests.integ.modin.sql_counter import sql_count_checker from tests.integ.modin.utils import create_test_dfs, eval_snowpark_pandas_result -freq = pytest.mark.parametrize("freq", IMPLEMENTED_DATEOFFSET_STRINGS) +freq = pytest.mark.parametrize("freq", ["min", "s", "h", "D"]) interval = pytest.mark.parametrize("interval", [1, 2, 3, 5, 15]) @@ -58,7 +55,7 @@ def test_resampler_asfreq(freq): @sql_count_checker(query_count=0) -def test_asfreq_negative(): +def test_asfreq_parameter_negative(): snow_df = pd.DataFrame( {"A": np.random.randn(15)}, index=native_pd.date_range("2020-01-01", periods=15, freq="1s"), @@ -71,3 +68,16 @@ def test_asfreq_negative(): snow_df.asfreq(freq="5s", method="ffill", fill_value=2) with pytest.raises(NotImplementedError): snow_df.resample("5s").asfreq(fill_value=2) + + +@sql_count_checker(query_count=0) +def test_asfreq_rule_negative(): + snow_df = pd.DataFrame( + {"A": np.random.randn(15)}, + index=native_pd.date_range("2020-01-01", periods=15, freq="1ME"), + ) + with pytest.raises( + NotImplementedError, + match="Snowpark pandas `asfreq` does not yet support frequencies week, month, quarter, or year", + ): + snow_df.asfreq(freq="3ME") diff --git a/tests/integ/modin/resample/test_resample_negative.py b/tests/integ/modin/resample/test_resample_negative.py index e20fc397ef8..9e91843d9a3 100644 --- a/tests/integ/modin/resample/test_resample_negative.py +++ b/tests/integ/modin/resample/test_resample_negative.py @@ -75,6 +75,20 @@ def test_resample_not_yet_implemented_freq(freq): snow_df.resample(freq).min().to_pandas() +@sql_count_checker(query_count=0) +def test_resample_not_yet_implemented_closed(): + snow_df = pd.DataFrame( + {"A": np.random.randn(15)}, + index=native_pd.date_range("2020-01-01", periods=15, freq="1D"), + ) + + with pytest.raises( + NotImplementedError, + match="resample with rule offset 3ME is only implemented with closed='left'", + ): + snow_df.resample("3ME").min().to_pandas() + + @pytest.mark.parametrize( "func", [