Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1023211: Support resample with rule offset W, ME, YE with closed = left #2254

Merged
merged 9 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- Added support for passing parameter `include_describe` to `Session.query_history`.
- Added support for `DatetimeIndex.mean` and `DatetimeIndex.std` methods.
- Added support for `Resampler.asfreq`.
- Added support for `resample` frequency `W`, `ME`, `YE` with `closed = "left"`.

#### Bug Fixes

Expand Down
14 changes: 8 additions & 6 deletions docs/source/modin/supported/dataframe_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ Methods
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``applymap`` | P | | ``N`` if ``na_action == "ignore"`` |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``asfreq`` | P | ``how``, ``normalize``, | See ``resample`` |
| | | ``fill_value`` | |
| ``asfreq`` | P | ``how``, ``normalize``, | Only DatetimeIndex is supported and its ``freq`` |
| | | ``fill_value`` | will be lost. Only ``rule`` frequencies 's', 'min',|
| | | | 'h', and 'D' are supported. |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``asof`` | N | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down Expand Up @@ -347,10 +348,11 @@ Methods
| ``replace`` | P | ``copy`` is ignored, ``method``, | |
| | | ``limit`` | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``resample`` | P | ``axis``, ``closed``, ``label``, | Only DatetimeIndex is supported and its ``freq`` |
| | | ``convention``, ``kind``, ``on`` | will be lost. Only ``rule`` frequencies 's', 'min',|
| | | , ``level``, ``origin``, | 'h', and 'D' are supported. |
| | | , ``offset``, ``group_keys`` | |
| ``resample`` | P | ``axis``, ``label``, | Only DatetimeIndex is supported and its ``freq`` |
| | | ``convention``, ``kind``, ``on`` | will be lost. ``rule`` frequencies 's', 'min', |
| | | , ``level``, ``origin``, | 'h', and 'D' are supported. ``rule`` frequency 'W' |
sfc-gh-nkrishna marked this conversation as resolved.
Show resolved Hide resolved
| | | , ``offset``, ``group_keys`` | , 'ME', and 'YE' are supported with |
sfc-gh-nkrishna marked this conversation as resolved.
Show resolved Hide resolved
| | | | `closed = "left"` |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``reset_index`` | Y | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down
14 changes: 8 additions & 6 deletions docs/source/modin/supported/series_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ Methods
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``argsort`` | N | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``asfreq`` | P | ``how``, ``normalize``, | See ``resample`` |
| | | ``fill_value`` | |
| ``asfreq`` | P | ``how``, ``normalize``, | Only DatetimeIndex is supported and its ``freq`` |
| | | ``fill_value`` | will be lost. Only ``rule`` frequencies 's', 'min',|
| | | | 'h', and 'D' are supported. |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``asof`` | N | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down Expand Up @@ -340,10 +341,11 @@ Methods
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``replace`` | P | ``method``, ``limit`` | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``resample`` | P | ``axis``, ``closed``, ``label``, | Only DatetimeIndex is supported and its ``freq`` |
| | | ``convention``, ``kind``, ``on`` | will be lost. Only ``rule`` frequencies 's', 'min',|
| | | , ``level``, ``origin``, | 'h', and 'D' are supported. |
| | | , ``offset``, ``group_keys`` | |
| ``resample`` | P | ``axis``, ``label``, | Only DatetimeIndex is supported and its ``freq`` |
| | | ``convention``, ``kind``, ``on`` | will be lost. ``rule`` frequencies 's', 'min', |
| | | , ``level``, ``origin``, | 'h', and 'D' are supported. ``rule`` frequency 'W' |
sfc-gh-nkrishna marked this conversation as resolved.
Show resolved Hide resolved
| | | , ``offset``, ``group_keys`` | , 'ME', and 'YE' are supported with |
sfc-gh-nkrishna marked this conversation as resolved.
Show resolved Hide resolved
| | | | `closed = "left"` |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``reset_index`` | Y | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down
65 changes: 45 additions & 20 deletions src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from typing import Any, Literal, NoReturn, Optional, Union

import pandas as native_pd
from pandas._libs.lib import no_default
from pandas._libs.tslibs import to_offset
from pandas._typing import Frequency
Expand All @@ -16,6 +15,7 @@
builtin,
dateadd,
datediff,
last_day,
lit,
to_timestamp_ntz,
)
Expand Down Expand Up @@ -47,7 +47,9 @@
"last",
]
IMPLEMENTED_MISC_METHODS = ["ffill"]
SUPPORTED_RESAMPLE_RULES = ["day", "hour", "second", "minute"]
SUPPORTED_RESAMPLE_RULES = ("second", "minute", "hour", "day", "week", "month", "year")
RULE_SECOND_TO_DAY = ("second", "minute", "hour", "day")
RULE_WEEK_TO_YEAR = ("week", "quarter", "month", "year")


# https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#dateoffset-objects
Expand Down Expand Up @@ -82,9 +84,21 @@
"ns",
]

SNOWFLAKE_SUPPORTED_DATEOFFSETS = ["W", "ME", "QE", "QS", "YS", "D", "h", "min", "s"]
SNOWFLAKE_SUPPORTED_DATEOFFSETS = [
"s",
"min",
"h",
"D",
"W",
"MS",
"ME",
"QS",
"QE",
"YS",
"YE",
]

IMPLEMENTED_DATEOFFSET_STRINGS = ["min", "s", "h", "D"]
IMPLEMENTED_DATEOFFSET_STRINGS = ["s", "min", "h", "D", "W", "ME", "YE"]
sfc-gh-nkrishna marked this conversation as resolved.
Show resolved Hide resolved

UNSUPPORTED_DATEOFFSET_STRINGS = list(
# sort so that tests that generate test cases from this last always use the
Expand Down Expand Up @@ -134,7 +148,6 @@ def rule_to_snowflake_width_and_slice_unit(rule: Frequency) -> tuple[int, str]:

rule_code = offset.rule_code
slice_width = offset.n

if rule_code == "s":
slice_unit = "second"
elif rule_code == "min":
Expand All @@ -143,16 +156,16 @@ def rule_to_snowflake_width_and_slice_unit(rule: Frequency) -> tuple[int, str]:
slice_unit = "hour"
elif rule_code == "D":
slice_unit = "day"
elif rule_code[0] == "W": # pragma: no cover
elif rule_code[0] == "W":
# treat codes like W-MON and W-SUN as "week":
slice_unit = "week"
elif rule_code == "ME": # pragma: no cover
elif rule_code == "ME":
slice_unit = "month"
elif rule_code[0] == "QE": # pragma: no cover
# treat codes like Q-DEC and Q-JAN as "quarter":
elif "QE" in rule_code: # pragma: no cover
# treat codes like QE-DEC and QE-JAN as "quarter":
slice_unit = "quarter"
elif rule_code[0] == "YE": # pragma: no cover
# treat codes like A-DEC and A-JAN as "year":
elif "YE" in rule_code:
# treat codes like YE-DEC and YE-JAN as "year":
slice_unit = "year"
else:
raise NotImplementedError(
Expand Down Expand Up @@ -204,9 +217,7 @@ def validate_resample_supported_by_snowflake(
"""
rule = resample_kwargs.get("rule")

_, slice_unit = rule_to_snowflake_width_and_slice_unit(
rule # type: ignore[arg-type]
)
_, slice_unit = rule_to_snowflake_width_and_slice_unit(rule)

if slice_unit not in SUPPORTED_RESAMPLE_RULES:
_argument_not_implemented("rule", rule)
Expand All @@ -216,8 +227,13 @@ def validate_resample_supported_by_snowflake(
_argument_not_implemented("axis", axis)

closed = resample_kwargs.get("closed")
if closed is not None: # pragma: no cover
if closed not in ("left", None) and slice_unit in RULE_SECOND_TO_DAY:
_argument_not_implemented("closed", closed)
if slice_unit in RULE_WEEK_TO_YEAR:
if closed != "left":
ErrorMessage.not_implemented(
f"resample with rule offset {rule} is only implemented with closed='left'"
)

label = resample_kwargs.get("label")
if label is not None: # pragma: no cover
Expand Down Expand Up @@ -376,8 +392,7 @@ def perform_resample_binning_on_frame(
# Time slices in Snowflake are aligned to snowflake_timeslice_alignment_date,
# so we must normalize input datetimes.
normalization_amt = (
native_pd.to_datetime(start_date)
- native_pd.to_datetime(SNOWFLAKE_TIMESLICE_ALIGNMENT_DATE)
pd.to_datetime(start_date) - pd.to_datetime(SNOWFLAKE_TIMESLICE_ALIGNMENT_DATE)
).total_seconds()

# Subtract the normalization amount in seconds from the input datetime.
Expand All @@ -399,7 +414,12 @@ def perform_resample_binning_on_frame(

# Call time_slice on the normalized datetime column with the slice_width and slice_unit.
# time_slice is not supported for timestamps with timezones, only TIMESTAMP_NTZ
normalized_dates_set_to_bins = time_slice(normalized_dates, slice_width, slice_unit)
normalized_dates_set_to_bins = time_slice(
column=normalized_dates,
slice_length=slice_width,
date_or_time_part=slice_unit,
start_or_end="start" if slice_unit in RULE_SECOND_TO_DAY else "end",
)
# frame:
# data_col
# date
Expand All @@ -414,8 +434,13 @@ def perform_resample_binning_on_frame(
# 1970-01-10 9

# Add the normalization amount in seconds back to the input datetime for the correct result.
unnormalized_dates_set_to_bins = dateadd(
"second", lit(normalization_amt), normalized_dates_set_to_bins
unnormalized_dates_set_to_bins = (
dateadd("second", lit(normalization_amt), normalized_dates_set_to_bins)
if slice_unit in RULE_SECOND_TO_DAY
else last_day(
dateadd("second", lit(normalization_amt), normalized_dates_set_to_bins),
slice_unit,
)
)
# frame:
# data_col
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
is_char,
is_null,
lag,
last_day,
last_value,
lead,
least,
Expand Down Expand Up @@ -255,6 +256,8 @@
)
from snowflake.snowpark.modin.plugin._internal.resample_utils import (
IMPLEMENTED_AGG_METHODS,
RULE_SECOND_TO_DAY,
RULE_WEEK_TO_YEAR,
fill_missing_resample_bins_for_frame,
get_expected_resample_bins_frame,
get_snowflake_quoted_identifier_for_resample_index_col,
Expand Down Expand Up @@ -11897,6 +11900,12 @@ def asfreq(
"Snowpark pandas `asfreq` does not support parameters `how`, `normalize`, or `fill_value`."
)

_, slice_unit = rule_to_snowflake_width_and_slice_unit(freq)
if slice_unit not in RULE_SECOND_TO_DAY:
ErrorMessage.not_implemented(
"Snowpark pandas `asfreq` does not yet support frequencies week, month, quarter, or year"
)

resample_kwargs = {
"rule": freq,
"axis": 0,
Expand Down Expand Up @@ -11971,7 +11980,7 @@ def resample(

rule = resample_kwargs.get("rule")

_, slice_unit = rule_to_snowflake_width_and_slice_unit(rule)
slice_width, slice_unit = rule_to_snowflake_width_and_slice_unit(rule)

min_max_index_column_quoted_identifier = (
frame.ordered_dataframe.generate_snowflake_quoted_identifiers(
Expand All @@ -11987,14 +11996,41 @@ def resample(
# For instance, if rule='3D' and the earliest date is
# 2020-03-01 1:00:00, the first date should be 2020-03-01,
# which is what date_trunc gives us.
start_date, end_date = frame.ordered_dataframe.agg(
date_trunc(slice_unit, min_(snowflake_index_column_identifier)).as_(
min_max_index_column_quoted_identifier[0]
),
date_trunc(slice_unit, max_(snowflake_index_column_identifier)).as_(
min_max_index_column_quoted_identifier[1]
),
).collect()[0]
if slice_unit in RULE_SECOND_TO_DAY:
start_date, end_date = frame.ordered_dataframe.agg(
date_trunc(slice_unit, min_(snowflake_index_column_identifier)).as_(
min_max_index_column_quoted_identifier[0]
),
date_trunc(slice_unit, max_(snowflake_index_column_identifier)).as_(
min_max_index_column_quoted_identifier[1]
),
).collect()[0]
else:
assert slice_unit in RULE_WEEK_TO_YEAR
start_date, end_date = frame.ordered_dataframe.agg(
last_day(
sfc-gh-nkrishna marked this conversation as resolved.
Show resolved Hide resolved
date_trunc(
slice_unit,
dateadd(
slice_unit,
pandas_lit(slice_width),
min_(snowflake_index_column_identifier),
),
),
slice_unit,
).as_(min_max_index_column_quoted_identifier[0]),
last_day(
date_trunc(
slice_unit,
dateadd(
slice_unit,
pandas_lit(slice_width),
max_(snowflake_index_column_identifier),
),
),
slice_unit,
).as_(min_max_index_column_quoted_identifier[1]),
).collect()[0]

if resample_method in ("ffill", "bfill"):
expected_frame = get_expected_resample_bins_frame(
Expand Down
6 changes: 3 additions & 3 deletions tests/integ/modin/resample/test_resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_resample_with_varying_freq_and_interval(freq, interval, agg_func):
{"A": np.random.randn(15)},
index=native_pd.date_range("2020-01-01", periods=15, freq=f"1{freq}"),
),
lambda df: getattr(df.resample(rule=rule), agg_func)(),
lambda df: getattr(df.resample(rule=rule, closed="left"), agg_func)(),
sfc-gh-nkrishna marked this conversation as resolved.
Show resolved Hide resolved
check_freq=False,
)

Expand Down Expand Up @@ -102,7 +102,7 @@ def test_resample_missing_data_upsample(agg_func, freq):
rule = f"1{freq}"
eval_snowpark_pandas_result(
*create_test_dfs({"A": np.random.randn(10)}, index=date_data),
lambda df: getattr(df.resample(rule=rule), agg_func)(),
lambda df: getattr(df.resample(rule=rule, closed="left"), agg_func)(),
check_freq=False,
)

Expand Down Expand Up @@ -159,7 +159,7 @@ def test_resample_series(freq, interval, agg_func):
range(15),
index=native_pd.date_range("2020-01-01", periods=15, freq=f"1{freq}"),
),
lambda ser: getattr(ser.resample(rule=rule), agg_func)(),
lambda ser: getattr(ser.resample(rule=rule, closed="left"), agg_func)(),
check_freq=False,
)

Expand Down
20 changes: 15 additions & 5 deletions tests/integ/modin/resample/test_resample_asfreq.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@
import pandas as native_pd
import pytest

from snowflake.snowpark.modin.plugin._internal.resample_utils import (
IMPLEMENTED_DATEOFFSET_STRINGS,
)
from tests.integ.modin.sql_counter import sql_count_checker
from tests.integ.modin.utils import create_test_dfs, eval_snowpark_pandas_result

freq = pytest.mark.parametrize("freq", IMPLEMENTED_DATEOFFSET_STRINGS)
freq = pytest.mark.parametrize("freq", ["min", "s", "h", "D"])
interval = pytest.mark.parametrize("interval", [1, 2, 3, 5, 15])


Expand Down Expand Up @@ -58,7 +55,7 @@ def test_resampler_asfreq(freq):


@sql_count_checker(query_count=0)
def test_asfreq_negative():
def test_asfreq_parameter_negative():
snow_df = pd.DataFrame(
{"A": np.random.randn(15)},
index=native_pd.date_range("2020-01-01", periods=15, freq="1s"),
Expand All @@ -71,3 +68,16 @@ def test_asfreq_negative():
snow_df.asfreq(freq="5s", method="ffill", fill_value=2)
with pytest.raises(NotImplementedError):
snow_df.resample("5s").asfreq(fill_value=2)


@sql_count_checker(query_count=0)
def test_asfreq_rule_negative():
snow_df = pd.DataFrame(
{"A": np.random.randn(15)},
index=native_pd.date_range("2020-01-01", periods=15, freq="1ME"),
)
with pytest.raises(
NotImplementedError,
match="Snowpark pandas `asfreq` does not yet support frequencies week, month, quarter, or year",
):
snow_df.asfreq(freq="3ME")
Loading
Loading