From 96852b9645e0825a21c7dbb751c14d74b2378dc6 Mon Sep 17 00:00:00 2001 From: Naren Krishna Date: Thu, 29 Aug 2024 15:56:47 -0700 Subject: [PATCH 1/9] SNOW-1630279: Refactor resample fillna to use join_utils Signed-off-by: Naren Krishna --- .../modin/plugin/_internal/join_utils.py | 8 +- .../plugin/_internal/ordered_dataframe.py | 25 ++++- .../modin/plugin/_internal/resample_utils.py | 91 ++++++++----------- .../compiler/snowflake_query_compiler.py | 2 +- 4 files changed, 63 insertions(+), 63 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index 846f3c64079..9946b433a05 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -43,6 +43,7 @@ class MatchComparator(Enum): GREATER_THAN = "__gt__" LESS_THAN_OR_EQUAL_TO = "__le__" LESS_THAN = "__lt__" + EQUAL_NULL = "equal_null" class InheritJoinIndex(IntFlag): @@ -109,6 +110,7 @@ def join( how: JoinTypeLit, left_on: list[str], right_on: list[str], + on_comparators: Optional[list[MatchComparator]] = None, left_match_col: Optional[str] = None, right_match_col: Optional[str] = None, match_comparator: Optional[MatchComparator] = None, @@ -126,11 +128,13 @@ def join( left_on: List of snowflake identifiers to join on from 'left' frame. right_on: List of snowflake identifiers to join on from 'right' frame. left_on and right_on must be lists of equal length. + on_comparators: list of MatchComparator {"__ge__", "__gt__", "__le__", "__lt__", "equal_null"} + Comparing the 'left_on' and 'right_on' columns. left_match_col: Snowflake identifier to match condition on from 'left' frame. Only applicable for 'asof' join. right_match_col: Snowflake identifier to match condition on from 'right' frame. Only applicable for 'asof' join. - match_comparator: MatchComparator {"__ge__", "__gt__", "__le__", "__lt__"} + match_comparator: MatchComparator {"__ge__", "__gt__", "__le__", "__lt__", "equal_null"} Only applicable for 'asof' join, the operation to compare 'left_match_condition' and 'right_match_condition'. sort: If True order merged frame on join keys. If False, ordering behavior @@ -200,11 +204,11 @@ def assert_snowpark_pandas_types_match() -> None: # in the join operation, and unnecessary columns are dropped from the projected columns. left = left.select_active_columns() right = right.select_active_columns() - joined_ordered_dataframe = left.ordered_dataframe.join( right=right.ordered_dataframe, left_on_cols=left_on, right_on_cols=right_on, + on_comparators=on_comparators, left_match_col=left_match_col, right_match_col=right_match_col, match_comparator=match_comparator, diff --git a/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py b/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py index f7ae87c2a5d..e884cf593c5 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py @@ -1052,6 +1052,7 @@ def join( right: "OrderedDataFrame", left_on_cols: Optional[list[str]] = None, right_on_cols: Optional[list[str]] = None, + on_comparators: Optional[list["MatchComparator"]] = None, # type: ignore[name-defined] # noqa: F821 left_match_col: Optional[str] = None, right_match_col: Optional[str] = None, match_comparator: Optional[ # type: ignore[name-defined] @@ -1073,11 +1074,13 @@ def join( right: The other OrderedDataFrame to join. left_on_cols: A list of column names from self OrderedDataFrame to be used for the join. right_on_cols: A list of column names from right OrderedDataFrame to be used for the join. + on_comparators: list of MatchComparator {"__ge__", "__gt__", "__le__", "__lt__", "equal_null"} + Comparing the 'left_on' and 'right_on' columns. left_match_col: Snowflake identifier to match condition on from 'left' frame. Only applicable for 'asof' join. right_match_col: Snowflake identifier to match condition on from 'right' frame. Only applicable for 'asof' join. - match_comparator: MatchComparator {"__ge__", "__gt__", "__le__", "__lt__"} + match_comparator: MatchComparator {"__ge__", "__gt__", "__le__", "__lt__", "equal_null"} Only applicable for 'asof' join, the operation to compare 'left_match_condition' and 'right_match_condition'. how: We support the following join types: @@ -1197,11 +1200,23 @@ def join( # get the new mapped right on identifier right_on_cols = [right_identifiers_rename_map[key] for key in right_on_cols] - # Generate sql ON clause 'EQUAL_NULL(col1, col2) and EQUAL_NULL(col3, col4) ...' + # Generate sql ON clause on = None - for left_col, right_col in zip(left_on_cols, right_on_cols): - eq = Column(left_col).equal_null(Column(right_col)) - on = eq if on is None else on & eq + # Use EQUAL_NULL as default to compare left and right "on" columns + from snowflake.snowpark.modin.plugin._internal.join_utils import MatchComparator + + on_comparators = ( + [MatchComparator.EQUAL_NULL] * len(left_on_cols) + if not on_comparators + else on_comparators + ) + for left_col, right_col, on_comparator in zip( + left_on_cols, right_on_cols, on_comparators + ): + column_comparison = getattr(Column(left_col), on_comparator.value)( + Column(right_col) + ) + on = column_comparison if on is None else on & column_comparison if how == "asof": assert left_match_col, "left_match_col was not provided to ASOF Join" diff --git a/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py index 87ac427d6dc..7dd6d430f1c 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py @@ -24,8 +24,14 @@ row_number, to_timestamp_ntz, ) +from snowflake.snowpark.modin.plugin._internal import join_utils from snowflake.snowpark.modin.plugin._internal.frame import InternalFrame -from snowflake.snowpark.modin.plugin._internal.join_utils import InheritJoinIndex, join +from snowflake.snowpark.modin.plugin._internal.join_utils import ( + InheritJoinIndex, + JoinKeyCoalesceConfig, + MatchComparator, + join, +) from snowflake.snowpark.modin.plugin._internal.ordered_dataframe import ( DataFrameReference, OrderedDataFrame, @@ -727,63 +733,38 @@ def perform_asof_join_on_frame( )[0][0] ) - # 3. Convert both preserved_frame and right_frame to Snowpark DataFrames to perform - # a non-equi-join. - left_snowpark_df = ( - preserving_frame.ordered_dataframe.to_projected_snowpark_dataframe() - ) - right_snowpark_df = right_frame.ordered_dataframe.to_projected_snowpark_dataframe() - - # 4. Join left_snowpark_df and right_snowpark_df using the following logic: + # 3. Join left_snowpark_df and right_snowpark_df using the following logic: # For each element left_frame's __resample_index__, join it with a single row # in right_frame whose __index__ value is less/greater than or equal to it and is closest in time. # If a row cannot be found, pad the joined columns from right_frame with null. - if fill_method == "bfill": - on_expr = ( - left_snowpark_df[left_timecol_snowflake_quoted_identifier] - <= right_snowpark_df[interval_start_snowflake_quoted_identifier] - ) & ( - left_snowpark_df[left_timecol_snowflake_quoted_identifier] - > right_snowpark_df[interval_end_snowflake_quoted_identifier] - ) - else: - assert fill_method == "ffill", f"invalid fill_method {fill_method}" - on_expr = ( - left_snowpark_df[left_timecol_snowflake_quoted_identifier] - >= right_snowpark_df[interval_start_snowflake_quoted_identifier] - ) & ( - left_snowpark_df[left_timecol_snowflake_quoted_identifier] - < right_snowpark_df[interval_end_snowflake_quoted_identifier] - ) - joined_snowpark_df = left_snowpark_df.join( - right=right_snowpark_df, - on=on_expr, + output_frame, _ = join_utils.join( + left=preserving_frame, + right=right_frame, how="left", - ) - # joined_snowpark_df: - # - # __resample_index__ __index__ a interval_end_col - # 2023-01-03 00:00:00 NULL NULL NULL - # 2023-01-05 00:00:00 2023-01-04 00:00:00 2 2023-01-05 23:00:00 - # 2023-01-07 00:00:00 2023-01-06 00:00:00 4 2023-01-07 02:00:00 - # 2023-01-09 00:00:00 2023-01-07 02:00:00 NULL 2023-01-10 00:00:00 - - # 5. Construct a final result with correct frame metadata. - # a - # __resample_index__ - # 2023-01-03 00:00:00 NaN - # 2023-01-05 00:00:00 2 - # 2023-01-07 00:00:00 4 - # 2023-01-09 00:00:00 NaN - return InternalFrame.create( - ordered_dataframe=OrderedDataFrame(DataFrameReference(joined_snowpark_df)), - data_column_pandas_labels=referenced_frame.data_column_pandas_labels, - data_column_snowflake_quoted_identifiers=referenced_frame.data_column_snowflake_quoted_identifiers, - index_column_pandas_labels=referenced_frame.index_column_pandas_labels, - index_column_snowflake_quoted_identifiers=[ - left_timecol_snowflake_quoted_identifier + left_on=[ + left_timecol_snowflake_quoted_identifier, + left_timecol_snowflake_quoted_identifier, + ], + right_on=[ + interval_start_snowflake_quoted_identifier, + interval_end_snowflake_quoted_identifier, + ], + on_comparators=( + [MatchComparator.GREATER_THAN_OR_EQUAL_TO, MatchComparator.LESS_THAN] + if fill_method == "ffill" + else [MatchComparator.LESS_THAN_OR_EQUAL_TO, MatchComparator.GREATER_THAN] + ), + sort=True, + join_key_coalesce_config=[ + JoinKeyCoalesceConfig.LEFT, + JoinKeyCoalesceConfig.LEFT, ], - data_column_pandas_index_names=referenced_frame.data_column_pandas_index_names, - data_column_types=referenced_frame.cached_data_column_snowpark_pandas_types, - index_column_types=referenced_frame.cached_index_column_snowpark_pandas_types, ) + # output_frame: + # a + # __resample_index__ + # 2023-01-03 00:00:00 NULL + # 2023-01-05 00:00:00 2 + # 2023-01-07 00:00:00 4 + # 2023-01-09 00:00:00 NULL + return output_frame diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index e77cb99fa8f..75ddad933f3 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -11374,7 +11374,7 @@ def resample( # a single row is selected from the input frame, where its date is the closest match in time based on # the filling method. We perform an ASOF join to accomplish this. frame = perform_asof_join_on_frame(expected_frame, frame, resample_method) - + return SnowflakeQueryCompiler(frame).set_index_names([None]) elif resample_method in IMPLEMENTED_AGG_METHODS: frame = perform_resample_binning_on_frame(frame, start_date, rule) if resample_method == "size": From f1f589600bbcad3d3e0bf65561ca1837c3df7a2f Mon Sep 17 00:00:00 2001 From: Naren Krishna Date: Thu, 29 Aug 2024 15:59:20 -0700 Subject: [PATCH 2/9] add CHANGELOG Signed-off-by: Naren Krishna --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 42efb25dcb5..b6dea9906de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ #### Improvements - Refactored `quoted_identifier_to_snowflake_type` to avoid making metadata queries if the types have been cached locally. +- Refactored `resample.fillna` implementation to use `OrderedDataFrame` join utility. #### Bug Fixes From c77f370bf60c1387cf4275da5b08ef1e26937970 Mon Sep 17 00:00:00 2001 From: Naren Krishna Date: Thu, 29 Aug 2024 16:05:09 -0700 Subject: [PATCH 3/9] fix comments Signed-off-by: Naren Krishna --- src/snowflake/snowpark/modin/plugin/_internal/join_utils.py | 1 + .../snowpark/modin/plugin/_internal/ordered_dataframe.py | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index 9946b433a05..c5f17a8a62c 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -204,6 +204,7 @@ def assert_snowpark_pandas_types_match() -> None: # in the join operation, and unnecessary columns are dropped from the projected columns. left = left.select_active_columns() right = right.select_active_columns() + joined_ordered_dataframe = left.ordered_dataframe.join( right=right.ordered_dataframe, left_on_cols=left_on, diff --git a/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py b/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py index e884cf593c5..f1f9853dee8 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py @@ -1200,16 +1200,17 @@ def join( # get the new mapped right on identifier right_on_cols = [right_identifiers_rename_map[key] for key in right_on_cols] - # Generate sql ON clause on = None - # Use EQUAL_NULL as default to compare left and right "on" columns + from snowflake.snowpark.modin.plugin._internal.join_utils import MatchComparator + # Use EQUAL_NULL as default to compare left and right "on" columns on_comparators = ( [MatchComparator.EQUAL_NULL] * len(left_on_cols) if not on_comparators else on_comparators ) + # Generate sql ON clause comparing left and right columns for left_col, right_col, on_comparator in zip( left_on_cols, right_on_cols, on_comparators ): From 983dff14370a305c99026ff7e7cd0c89eb874fad Mon Sep 17 00:00:00 2001 From: Naren Krishna Date: Fri, 30 Aug 2024 08:54:47 -0700 Subject: [PATCH 4/9] fix index name Signed-off-by: Naren Krishna --- .../modin/plugin/compiler/snowflake_query_compiler.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index e6baba0ad66..c5fc0632d96 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -11359,8 +11359,11 @@ def resample( # The output frame's DatetimeIndex is identical to expected_frame's. For each date in the DatetimeIndex, # a single row is selected from the input frame, where its date is the closest match in time based on # the filling method. We perform an ASOF join to accomplish this. - frame = perform_asof_join_on_frame(expected_frame, frame, resample_method) - return SnowflakeQueryCompiler(frame).set_index_names([None]) + index_name = frame.index_column_pandas_labels + output_frame = perform_asof_join_on_frame( + expected_frame, frame, resample_method + ) + return SnowflakeQueryCompiler(output_frame).set_index_names(index_name) elif resample_method in IMPLEMENTED_AGG_METHODS: frame = perform_resample_binning_on_frame(frame, start_date, rule) if resample_method == "size": From 522800744ee1625bb474f2c674717b41981472b3 Mon Sep 17 00:00:00 2001 From: Naren Krishna Date: Fri, 30 Aug 2024 13:03:43 -0700 Subject: [PATCH 5/9] address comments Signed-off-by: Naren Krishna --- src/snowflake/snowpark/modin/plugin/_internal/join_utils.py | 3 ++- .../snowpark/modin/plugin/_internal/ordered_dataframe.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index c5f17a8a62c..5eb39f026fd 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -129,7 +129,8 @@ def join( right_on: List of snowflake identifiers to join on from 'right' frame. left_on and right_on must be lists of equal length. on_comparators: list of MatchComparator {"__ge__", "__gt__", "__le__", "__lt__", "equal_null"} - Comparing the 'left_on' and 'right_on' columns. + Comparing the 'left_on' and 'right_on' columns. Defaults to list of "equal_null" + of the same length as 'left_on' and 'right_on'. left_match_col: Snowflake identifier to match condition on from 'left' frame. Only applicable for 'asof' join. right_match_col: Snowflake identifier to match condition on from 'right' frame. diff --git a/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py b/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py index f1f9853dee8..f6f58e3299f 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py @@ -1075,7 +1075,8 @@ def join( left_on_cols: A list of column names from self OrderedDataFrame to be used for the join. right_on_cols: A list of column names from right OrderedDataFrame to be used for the join. on_comparators: list of MatchComparator {"__ge__", "__gt__", "__le__", "__lt__", "equal_null"} - Comparing the 'left_on' and 'right_on' columns. + Comparing the 'left_on' and 'right_on' columns. Defaults to list of "equal_null" + of the same length as 'left_on' and 'right_on'. left_match_col: Snowflake identifier to match condition on from 'left' frame. Only applicable for 'asof' join. right_match_col: Snowflake identifier to match condition on from 'right' frame. From d07feab883c7cda891654e467f26f1554b2185f0 Mon Sep 17 00:00:00 2001 From: Naren Krishna Date: Tue, 3 Sep 2024 14:04:52 -0700 Subject: [PATCH 6/9] refactor resample to use ASOF Join Signed-off-by: Naren Krishna --- .../modin/plugin/_internal/join_utils.py | 8 +- .../plugin/_internal/ordered_dataframe.py | 27 +----- .../modin/plugin/_internal/resample_utils.py | 91 +++---------------- 3 files changed, 18 insertions(+), 108 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index 5eb39f026fd..846f3c64079 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -43,7 +43,6 @@ class MatchComparator(Enum): GREATER_THAN = "__gt__" LESS_THAN_OR_EQUAL_TO = "__le__" LESS_THAN = "__lt__" - EQUAL_NULL = "equal_null" class InheritJoinIndex(IntFlag): @@ -110,7 +109,6 @@ def join( how: JoinTypeLit, left_on: list[str], right_on: list[str], - on_comparators: Optional[list[MatchComparator]] = None, left_match_col: Optional[str] = None, right_match_col: Optional[str] = None, match_comparator: Optional[MatchComparator] = None, @@ -128,14 +126,11 @@ def join( left_on: List of snowflake identifiers to join on from 'left' frame. right_on: List of snowflake identifiers to join on from 'right' frame. left_on and right_on must be lists of equal length. - on_comparators: list of MatchComparator {"__ge__", "__gt__", "__le__", "__lt__", "equal_null"} - Comparing the 'left_on' and 'right_on' columns. Defaults to list of "equal_null" - of the same length as 'left_on' and 'right_on'. left_match_col: Snowflake identifier to match condition on from 'left' frame. Only applicable for 'asof' join. right_match_col: Snowflake identifier to match condition on from 'right' frame. Only applicable for 'asof' join. - match_comparator: MatchComparator {"__ge__", "__gt__", "__le__", "__lt__", "equal_null"} + match_comparator: MatchComparator {"__ge__", "__gt__", "__le__", "__lt__"} Only applicable for 'asof' join, the operation to compare 'left_match_condition' and 'right_match_condition'. sort: If True order merged frame on join keys. If False, ordering behavior @@ -210,7 +205,6 @@ def assert_snowpark_pandas_types_match() -> None: right=right.ordered_dataframe, left_on_cols=left_on, right_on_cols=right_on, - on_comparators=on_comparators, left_match_col=left_match_col, right_match_col=right_match_col, match_comparator=match_comparator, diff --git a/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py b/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py index f6f58e3299f..f7ae87c2a5d 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/ordered_dataframe.py @@ -1052,7 +1052,6 @@ def join( right: "OrderedDataFrame", left_on_cols: Optional[list[str]] = None, right_on_cols: Optional[list[str]] = None, - on_comparators: Optional[list["MatchComparator"]] = None, # type: ignore[name-defined] # noqa: F821 left_match_col: Optional[str] = None, right_match_col: Optional[str] = None, match_comparator: Optional[ # type: ignore[name-defined] @@ -1074,14 +1073,11 @@ def join( right: The other OrderedDataFrame to join. left_on_cols: A list of column names from self OrderedDataFrame to be used for the join. right_on_cols: A list of column names from right OrderedDataFrame to be used for the join. - on_comparators: list of MatchComparator {"__ge__", "__gt__", "__le__", "__lt__", "equal_null"} - Comparing the 'left_on' and 'right_on' columns. Defaults to list of "equal_null" - of the same length as 'left_on' and 'right_on'. left_match_col: Snowflake identifier to match condition on from 'left' frame. Only applicable for 'asof' join. right_match_col: Snowflake identifier to match condition on from 'right' frame. Only applicable for 'asof' join. - match_comparator: MatchComparator {"__ge__", "__gt__", "__le__", "__lt__", "equal_null"} + match_comparator: MatchComparator {"__ge__", "__gt__", "__le__", "__lt__"} Only applicable for 'asof' join, the operation to compare 'left_match_condition' and 'right_match_condition'. how: We support the following join types: @@ -1201,24 +1197,11 @@ def join( # get the new mapped right on identifier right_on_cols = [right_identifiers_rename_map[key] for key in right_on_cols] + # Generate sql ON clause 'EQUAL_NULL(col1, col2) and EQUAL_NULL(col3, col4) ...' on = None - - from snowflake.snowpark.modin.plugin._internal.join_utils import MatchComparator - - # Use EQUAL_NULL as default to compare left and right "on" columns - on_comparators = ( - [MatchComparator.EQUAL_NULL] * len(left_on_cols) - if not on_comparators - else on_comparators - ) - # Generate sql ON clause comparing left and right columns - for left_col, right_col, on_comparator in zip( - left_on_cols, right_on_cols, on_comparators - ): - column_comparison = getattr(Column(left_col), on_comparator.value)( - Column(right_col) - ) - on = column_comparison if on is None else on & column_comparison + for left_col, right_col in zip(left_on_cols, right_on_cols): + eq = Column(left_col).equal_null(Column(right_col)) + on = eq if on is None else on & eq if how == "asof": assert left_match_col, "left_match_col was not provided to ASOF Join" diff --git a/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py index 7dd6d430f1c..414e557a578 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py @@ -14,12 +14,8 @@ from snowflake.snowpark.column import Column from snowflake.snowpark.functions import ( builtin, - coalesce, - col, dateadd, datediff, - lag, - lead, lit, row_number, to_timestamp_ntz, @@ -28,7 +24,6 @@ from snowflake.snowpark.modin.plugin._internal.frame import InternalFrame from snowflake.snowpark.modin.plugin._internal.join_utils import ( InheritJoinIndex, - JoinKeyCoalesceConfig, MatchComparator, join, ) @@ -38,7 +33,6 @@ ) from snowflake.snowpark.modin.plugin._internal.utils import ( generate_snowflake_quoted_identifiers_helper, - pandas_lit, ) from snowflake.snowpark.modin.plugin.utils.error_message import ErrorMessage from snowflake.snowpark.types import DateType, TimestampType @@ -628,7 +622,6 @@ def fill_missing_resample_bins_for_frame( ) -# TODO: SNOW-989398 Migrate function to ASOF join def perform_asof_join_on_frame( preserving_frame: InternalFrame, referenced_frame: InternalFrame, fill_method: str ) -> InternalFrame: @@ -658,7 +651,9 @@ def perform_asof_join_on_frame( frame : InternalFrame A new frame that holds the result of an ASOF join. """ - # Consider the following example: + # Consider the following example where we want to perform an ASOF JOIN of preserving_frame + # and referenced_frame where __resample_index__ >= __index__ if forward fill + # or __resample_index__ <= __index__ if backward fill: # # preserved_frame: # __resample_index__ @@ -677,88 +672,26 @@ def perform_asof_join_on_frame( # 2023-01-07 02:00:00 NaN # 2023-01-10 00:00:00 6 - # We want to perform an ASOF JOIN of preserving_frame and referenced_frame. Here - # are the steps to take: - - # 1. Construct right_frame using referenced_frame, which has a - # temporary column, interval_end_col, that holds the closest - # following timestamp to every value in __index__. The last value in - # interval_end_col is dummy value that represents the smallest or largest - # (e.g. bfill or ffill) possible date in Snowflake. - interval_end_pandas_label = "interval_end_col" interval_start_snowflake_quoted_identifier = ( get_snowflake_quoted_identifier_for_resample_index_col(referenced_frame) ) - if fill_method == "bfill": - # Snowflake recommends using 1582 as the smallest year for date or timestamp type - # due to limits on the Gregorian calendar. See https://docs.snowflake.com/en/sql-reference/data-types-datetime - interval_end_col = coalesce( - lag(col(interval_start_snowflake_quoted_identifier)).over( - Window.order_by(col(interval_start_snowflake_quoted_identifier).asc()) - ), - pandas_lit("1582-01-01 00:00:00"), - ) - else: - # Snowflake recommends using 9999 as the largest year for date or timestamp type - # due to limits on the Gregorian calendar. See https://docs.snowflake.com/en/sql-reference/data-types-datetime - assert fill_method == "ffill", "`fill_method` can only be 'bfill' or 'ffill'" - interval_end_col = coalesce( - lead(col(interval_start_snowflake_quoted_identifier)).over( - Window.order_by(col(interval_start_snowflake_quoted_identifier).asc()) - ), - pandas_lit("9999-12-31 23:59:59"), - ) - right_frame = referenced_frame.append_column( - interval_end_pandas_label, interval_end_col - ) - # right_frame: - # a interval_end_col - # __index__ - # 2023-01-03 01:00:00 1 2023-01-04 00:00:00 - # 2023-01-04 00:00:00 2 2023-01-05 23:00:00 - # 2023-01-05 23:00:00 3 2023-01-06 00:00:00 - # 2023-01-06 00:00:00 4 2023-01-07 02:00:00 - # 2023-01-07 02:00:00 NaN 2023-01-10 00:00:00 - # 2023-01-10 00:00:00 6 9999-01-01 00:00:00 - - # 2. Get the Snowflake identifiers needed for the join condition. - # interval_start_snowflake_quoted_identifier is needed as well, - # but has already been fetched above. left_timecol_snowflake_quoted_identifier = ( get_snowflake_quoted_identifier_for_resample_index_col(preserving_frame) ) - interval_end_snowflake_quoted_identifier = ( - right_frame.get_snowflake_quoted_identifiers_group_by_pandas_labels( - pandas_labels=[interval_end_pandas_label] - )[0][0] - ) - - # 3. Join left_snowpark_df and right_snowpark_df using the following logic: - # For each element left_frame's __resample_index__, join it with a single row - # in right_frame whose __index__ value is less/greater than or equal to it and is closest in time. - # If a row cannot be found, pad the joined columns from right_frame with null. output_frame, _ = join_utils.join( left=preserving_frame, - right=right_frame, - how="left", - left_on=[ - left_timecol_snowflake_quoted_identifier, - left_timecol_snowflake_quoted_identifier, - ], - right_on=[ - interval_start_snowflake_quoted_identifier, - interval_end_snowflake_quoted_identifier, - ], - on_comparators=( - [MatchComparator.GREATER_THAN_OR_EQUAL_TO, MatchComparator.LESS_THAN] + right=referenced_frame, + how="asof", + left_on=[left_timecol_snowflake_quoted_identifier], + right_on=[interval_start_snowflake_quoted_identifier], + left_match_col=left_timecol_snowflake_quoted_identifier, + right_match_col=interval_start_snowflake_quoted_identifier, + match_comparator=( + MatchComparator.GREATER_THAN_OR_EQUAL_TO if fill_method == "ffill" - else [MatchComparator.LESS_THAN_OR_EQUAL_TO, MatchComparator.GREATER_THAN] + else MatchComparator.LESS_THAN_OR_EQUAL_TO ), sort=True, - join_key_coalesce_config=[ - JoinKeyCoalesceConfig.LEFT, - JoinKeyCoalesceConfig.LEFT, - ], ) # output_frame: # a From bd0c592db04a85238afbb47f9d6b877142b4c3ff Mon Sep 17 00:00:00 2001 From: Naren Krishna Date: Tue, 3 Sep 2024 14:08:34 -0700 Subject: [PATCH 7/9] refactor to use ASOF Join Signed-off-by: Naren Krishna --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 80f0d2bb550..cb81de82169 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,7 +81,7 @@ - Refactored `quoted_identifier_to_snowflake_type` to avoid making metadata queries if the types have been cached locally. - Improved `pd.to_datetime` to handle all local input cases. -- Refactored `resample.fillna` implementation to use `OrderedDataFrame` join utility. +- Refactored `resample.fillna` implementation to use ASOF Join. #### Bug Fixes From 798c9329e8d882f946cdba7bee588ac88d1c30c6 Mon Sep 17 00:00:00 2001 From: Naren Krishna Date: Tue, 3 Sep 2024 16:40:28 -0700 Subject: [PATCH 8/9] address comments Signed-off-by: Naren Krishna --- CHANGELOG.md | 3 +-- .../snowpark/modin/plugin/_internal/resample_utils.py | 10 +++++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cb81de82169..7ddace3b5ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,8 +80,7 @@ #### Improvements - Refactored `quoted_identifier_to_snowflake_type` to avoid making metadata queries if the types have been cached locally. -- Improved `pd.to_datetime` to handle all local input cases. -- Refactored `resample.fillna` implementation to use ASOF Join. +- Improved `pd.to_datetime` to handle all local input cases. #### Bug Fixes diff --git a/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py index 414e557a578..72365456709 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py @@ -672,20 +672,20 @@ def perform_asof_join_on_frame( # 2023-01-07 02:00:00 NaN # 2023-01-10 00:00:00 6 - interval_start_snowflake_quoted_identifier = ( - get_snowflake_quoted_identifier_for_resample_index_col(referenced_frame) - ) left_timecol_snowflake_quoted_identifier = ( get_snowflake_quoted_identifier_for_resample_index_col(preserving_frame) ) + right_timecol_snowflake_quoted_identifier = ( + get_snowflake_quoted_identifier_for_resample_index_col(referenced_frame) + ) output_frame, _ = join_utils.join( left=preserving_frame, right=referenced_frame, how="asof", left_on=[left_timecol_snowflake_quoted_identifier], - right_on=[interval_start_snowflake_quoted_identifier], + right_on=[right_timecol_snowflake_quoted_identifier], left_match_col=left_timecol_snowflake_quoted_identifier, - right_match_col=interval_start_snowflake_quoted_identifier, + right_match_col=right_timecol_snowflake_quoted_identifier, match_comparator=( MatchComparator.GREATER_THAN_OR_EQUAL_TO if fill_method == "ffill" From e7706a379ea114c9f30b73395876bb5119dab6be Mon Sep 17 00:00:00 2001 From: Naren Krishna Date: Tue, 3 Sep 2024 19:46:56 -0700 Subject: [PATCH 9/9] address comments 2 Signed-off-by: Naren Krishna --- .../snowpark/modin/plugin/_internal/resample_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py index 72365456709..f2ca17d2038 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/resample_utils.py @@ -682,8 +682,8 @@ def perform_asof_join_on_frame( left=preserving_frame, right=referenced_frame, how="asof", - left_on=[left_timecol_snowflake_quoted_identifier], - right_on=[right_timecol_snowflake_quoted_identifier], + left_on=[], + right_on=[], left_match_col=left_timecol_snowflake_quoted_identifier, right_match_col=right_timecol_snowflake_quoted_identifier, match_comparator=(