From f5536b0c76714ecb4d470c34e0060cc7bc7041df Mon Sep 17 00:00:00 2001 From: azhan Date: Wed, 21 Aug 2024 14:50:09 -0700 Subject: [PATCH] SNOW-1625468 Support indexing with Timedelta data columns --- CHANGELOG.md | 3 +- .../snowpark/modin/plugin/_internal/frame.py | 16 +- .../modin/plugin/_internal/indexing_utils.py | 115 ++++-- .../modin/plugin/_internal/join_utils.py | 36 +- .../plugin/_internal/snowpark_pandas_types.py | 54 +-- .../modin/plugin/_internal/transpose_utils.py | 27 +- .../modin/plugin/_internal/type_utils.py | 4 +- .../snowpark/modin/plugin/_internal/utils.py | 6 + .../compiler/snowflake_query_compiler.py | 6 - .../modin/plugin/utils/warning_message.py | 6 + tests/integ/modin/types/test_timedelta.py | 19 + .../modin/types/test_timedelta_indexing.py | 391 ++++++++++++++++++ .../unit/modin/test_snowpark_pandas_types.py | 40 +- 13 files changed, 640 insertions(+), 83 deletions(-) create mode 100644 tests/integ/modin/types/test_timedelta_indexing.py diff --git a/CHANGELOG.md b/CHANGELOG.md index ab546703fd6..524a8efcb21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,8 @@ - Added limited support for the `Timedelta` type, including - support `copy`, `cache_result`, `shift`, `sort_index`. - `NotImplementedError` will be raised for the rest of methods that do not support `Timedelta`. + - support for subtracting two timestamps to get a Timedelta. + - support indexing with Timedelta data columns. - Added support for index's arithmetic and comparison operators. - Added support for `Series.dt.round`. - Added documentation pages for `DatetimeIndex`. @@ -34,7 +36,6 @@ - Added support for `Index.__repr__`. - Added support for `DatetimeIndex.month_name` and `DatetimeIndex.day_name`. - Added support for `Series.dt.weekday`, `Series.dt.time`, and `DatetimeIndex.time`. -- Added support for subtracting two timestamps to get a Timedelta. #### Bug Fixes diff --git a/src/snowflake/snowpark/modin/plugin/_internal/frame.py b/src/snowflake/snowpark/modin/plugin/_internal/frame.py index 9834d54eafb..47db9baa835 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/frame.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/frame.py @@ -89,6 +89,10 @@ def _create_snowflake_quoted_identifier_to_snowpark_pandas_type( f"The length of data_column_types {data_column_types} is different from the length of " f"data_column_snowflake_quoted_identifiers {data_column_snowflake_quoted_identifiers}" ) + for t in data_column_types: + assert t is None or isinstance( + t, SnowparkPandasType + ), f"wrong data_column_types value {t}" if index_column_types is not None: assert len(index_column_types) == len( index_column_snowflake_quoted_identifiers @@ -96,6 +100,10 @@ def _create_snowflake_quoted_identifier_to_snowpark_pandas_type( f"The length of index_column_types {index_column_types} is different from the length of " f"index_column_snowflake_quoted_identifiers {index_column_snowflake_quoted_identifiers}" ) + for t in index_column_types: + assert t is None or isinstance( + t, SnowparkPandasType + ), f"wrong index_column_types value {t}" return MappingProxyType( { @@ -988,6 +996,7 @@ def project_columns( self, pandas_labels: list[Hashable], column_objects: list[SnowparkColumn], + column_types: Optional[list[Optional[SnowparkPandasType]]] = None, ) -> "InternalFrame": """ Project new columns with column_objects as the new data columns for the new Internal Frame. @@ -1001,10 +1010,15 @@ def project_columns( Args: pandas_labels: The pandas labels for the newly projected data columns column_objects: the Snowpark columns used to project the new data columns + column_types: The optional SnowparkPandasType for the new column. Returns: A new InternalFrame with the newly projected columns as data column """ + if column_types is None: + column_types = [None] * len(pandas_labels) + else: + assert len(column_types) == len(pandas_labels) new_column_identifiers = ( self.ordered_dataframe.generate_snowflake_quoted_identifiers( pandas_labels=pandas_labels, @@ -1020,7 +1034,7 @@ def project_columns( data_column_pandas_index_names=self.data_column_pandas_index_names, index_column_pandas_labels=self.index_column_pandas_labels, index_column_snowflake_quoted_identifiers=self.index_column_snowflake_quoted_identifiers, - data_column_types=None, + data_column_types=column_types, index_column_types=self.cached_index_column_snowpark_pandas_types, ) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/indexing_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/indexing_utils.py index f0e33d0b8b8..c8dc52e3741 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/indexing_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/indexing_utils.py @@ -46,6 +46,10 @@ join, ) from snowflake.snowpark.modin.plugin._internal.ordered_dataframe import OrderingColumn +from snowflake.snowpark.modin.plugin._internal.snowpark_pandas_types import ( + SnowparkPandasColumn, + SnowparkPandasType, +) from snowflake.snowpark.modin.plugin._internal.type_utils import ( NUMERIC_SNOWFLAKE_TYPES_TUPLE, is_numeric_snowpark_type, @@ -2436,16 +2440,17 @@ def set_frame_2d_labels( ) def generate_updated_expr_for_existing_col( - col_pos: int, - ) -> Column: + col_pos: int, origin_col_type: Optional[SnowparkPandasType] + ) -> SnowparkPandasColumn: """ Helper function to generate the updated existing column based on the item value. Args: col_pos: the existing column position from internal_frame + origin_col_type: the original column type Returns: - The updated column + The updated SnowparkPandasColumn """ original_col = col( result_frame.data_column_snowflake_quoted_identifiers[col_pos] @@ -2453,15 +2458,21 @@ def generate_updated_expr_for_existing_col( # col_pos can be any column in the original frame, i.e., internal_frame. So if it is not in # existing_column_positions, we can just return the original column if col_pos not in col_info.existing_column_positions: - return original_col + return SnowparkPandasColumn(original_col, origin_col_type) col_label = result_frame.data_column_pandas_labels[col_pos] # col will be updated + col_obj_type = None if item_is_scalar: col_obj = pandas_lit(item) + col_obj_type = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type( + type(item) + ) elif item_column_values: - col_obj = pandas_lit( - item_column_values[item_data_col_label_to_pos_map[col_label]] + item_val = item_column_values[item_data_col_label_to_pos_map[col_label]] + col_obj = pandas_lit(item_val) + col_obj_type = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type( + type(item_val) ) elif not matching_item_columns_by_label: # columns in item is matched by position not label here. E.g., assuming df as A, B, C three columns, @@ -2469,25 +2480,23 @@ def generate_updated_expr_for_existing_col( # df[["B", "A"]] = item will treat the first item's column as B and the second as A. Also, if column key # contains duplicates, e.g, df[["A", "A"]] = item, then only the right index matters, i.e., the second # column will be treated as A. - col_obj = col( - result_frame.data_column_snowflake_quoted_identifiers[ - item_data_col_offset - + rindex(col_info.existing_column_positions, col_pos) - ] + offset = item_data_col_offset + rindex( + col_info.existing_column_positions, col_pos ) + col_obj = col(result_frame.data_column_snowflake_quoted_identifiers[offset]) + col_obj_type = result_frame.cached_data_column_snowpark_pandas_types[offset] elif ( matching_item_columns_by_label and col_label in item_data_col_label_to_pos_map ): # col may have value in item, e.g., column "X" - col_obj = col( - result_frame.data_column_snowflake_quoted_identifiers[ - item_data_col_offset + item_data_col_label_to_pos_map[col_label] - ] - ) + offset = item_data_col_offset + item_data_col_label_to_pos_map[col_label] + col_obj = col(result_frame.data_column_snowflake_quoted_identifiers[offset]) + col_obj_type = result_frame.cached_data_column_snowpark_pandas_types[offset] else: # e.g., columns "E", i.e., column exists in the column key but not in item col_obj = pandas_lit(None) + if index_is_scalar: col_obj = iff( result_frame_index_col.equal_null(pandas_lit(index)), @@ -2499,9 +2508,14 @@ def generate_updated_expr_for_existing_col( col_obj = iff(index_data_col, col_obj, original_col) elif index_is_frame: col_obj = iff(index_data_col.is_null(), original_col, col_obj) - return col_obj - def generate_updated_expr_for_new_col(col_label: Hashable) -> Column: + col_obj_type = col_obj_type if col_obj_type == origin_col_type else None + + return SnowparkPandasColumn(col_obj, col_obj_type) + + def generate_updated_expr_for_new_col( + col_label: Hashable, + ) -> SnowparkPandasColumn: """ Helper function to generate the newly added column. @@ -2509,29 +2523,36 @@ def generate_updated_expr_for_new_col(col_label: Hashable) -> Column: col_label: the label of the new column Returns: - The new column + The new SnowparkPandasColumn """ + col_obj_type = None if item_is_scalar: new_column = pandas_lit(item) + col_obj_type = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type( + type(item) + ) elif item_column_values: new_column = item_column_values[item_data_col_label_to_pos_map[col_label]] + col_obj_type = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type( + type(new_column) + ) elif not matching_item_columns_by_label: + offset = item_data_col_offset + item_data_col_label_to_pos_map[col_label] new_column = col( - result_frame.data_column_snowflake_quoted_identifiers[ - item_data_col_offset + item_data_col_label_to_pos_map[col_label] - ] + result_frame.data_column_snowflake_quoted_identifiers[offset] ) + col_obj_type = result_frame.cached_data_column_snowpark_pandas_types[offset] elif ( matching_item_columns_by_label and col_label in item_data_col_label_to_pos_map ): + offset = item_data_col_offset + item_data_col_label_to_pos_map[col_label] new_column = col( - result_frame.data_column_snowflake_quoted_identifiers[ - item_data_col_offset + item_data_col_label_to_pos_map[col_label] - ] + result_frame.data_column_snowflake_quoted_identifiers[offset] ) + col_obj_type = result_frame.cached_data_column_snowpark_pandas_types[offset] else: - return pandas_lit(None) + return SnowparkPandasColumn(pandas_lit(None)) if index_is_scalar: new_column = iff( result_frame_index_col.equal_null(pandas_lit(index)), @@ -2543,14 +2564,14 @@ def generate_updated_expr_for_new_col(col_label: Hashable) -> Column: new_column = iff(index_data_col, new_column, pandas_lit(None)) elif index_is_frame: new_column = iff(index_data_col.is_null(), pandas_lit(None), new_column) - return new_column + return SnowparkPandasColumn(new_column, col_obj_type) # The rest of code is to generate the list of project columns and labels for a loc set operation that can involve # replacing existing column values as well as adding new columns. The caller must provide the callables to select # the appropriate column replacement or new column logic. # # We use project_columns methods to handle all columns in one go - project_labels, project_columns = [], [] + project_labels, project_columns, project_column_types = [], [], [] num_data_columns = len(internal_frame.data_column_pandas_labels) duplicate_data_column_pos_to_count_map = ( @@ -2573,23 +2594,31 @@ def generate_updated_expr_for_new_col(col_label: Hashable) -> Column: for col_pos in range(num_data_columns): col_label = result_frame.data_column_pandas_labels[col_pos] project_labels.append(col_label) - col_obj = generate_updated_expr_for_existing_col(col_pos) + origin_col_type = result_frame.cached_data_column_snowpark_pandas_types[col_pos] + col_obj, col_type = generate_updated_expr_for_existing_col( + col_pos, origin_col_type + ) project_columns.append(col_obj) + project_column_types.append(col_type) # When duplicate is needed, pandas will duplicate the column right after the original columns. if col_pos in duplicate_data_column_pos_to_count_map: cnt = duplicate_data_column_pos_to_count_map[col_pos] project_labels += [col_label] * cnt project_columns += [col_obj] * cnt + project_column_types += [col_type] * cnt # Last, append new columns for col_label in new_data_column_pandas_labels_to_append: - new_column = generate_updated_expr_for_new_col(col_label) + new_column, new_column_type = generate_updated_expr_for_new_col(col_label) project_labels.append(col_label) project_columns.append(new_column) + project_column_types.append(new_column_type) return result_frame.project_columns( - pandas_labels=project_labels, column_objects=project_columns + pandas_labels=project_labels, + column_objects=project_columns, + column_types=project_column_types, ) @@ -2674,7 +2703,10 @@ def set_frame_2d_positional( kv_frame = get_kv_frame_from_index_and_item_frames(index, item) item_data_columns_len = len(item.data_column_snowflake_quoted_identifiers) else: - kv_frame = index.append_column(ITEM_VALUE_LABEL, pandas_lit(item)) + item_type = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type( + type(item) + ) + kv_frame = index.append_column(ITEM_VALUE_LABEL, pandas_lit(item), item_type) item_data_columns_len = 1 # Next we join the key-value frame with the original frame based on row_position and row key @@ -2693,6 +2725,7 @@ def set_frame_2d_positional( frame = internal_frame.ensure_row_position_column() kv_frame = kv_frame.ensure_row_position_column() + item_type = kv_frame.cached_data_column_snowpark_pandas_types[-1] # To match the columns of the original dataframe (see [df] above) and the item values (see [item] above) we # use the "columns" containing the column position indices. For example, if columns=[0, 2, 3] this would @@ -2739,6 +2772,7 @@ def set_frame_2d_positional( df_kv_frame = df_kv_frame.ensure_row_position_column() + new_data_column_types = [] for col_pos, snowflake_quoted_identifier_pair in enumerate( zip( frame.data_column_snowflake_quoted_identifiers, @@ -2781,12 +2815,26 @@ def set_frame_2d_positional( df_snowflake_quoted_identifier, ).as_(new_snowflake_quoted_identifier) ) + if ( + frame.snowflake_quoted_identifier_to_snowpark_pandas_type[ + original_snowflake_quoted_identifier + ] + == item_type + ): + new_data_column_types.append(item_type) + else: + new_data_column_types.append(None) else: select_list.append( col(original_snowflake_quoted_identifier).as_( new_snowflake_quoted_identifier ) ) + new_data_column_types.append( + frame.snowflake_quoted_identifier_to_snowpark_pandas_type[ + original_snowflake_quoted_identifier + ] + ) select_list.append(df_kv_frame.row_position_snowflake_quoted_identifier) ordered_dataframe = df_kv_frame.ordered_dataframe.select(select_list) @@ -2797,7 +2845,7 @@ def set_frame_2d_positional( data_column_snowflake_quoted_identifiers=new_data_column_snowflake_quoted_identifiers, index_column_pandas_labels=frame.index_column_pandas_labels, index_column_snowflake_quoted_identifiers=frame.index_column_snowflake_quoted_identifiers, - data_column_types=frame.cached_data_column_snowpark_pandas_types, + data_column_types=new_data_column_types, index_column_types=frame.cached_index_column_snowpark_pandas_types, ) @@ -3014,6 +3062,7 @@ def get_item_series_as_single_row_frame( last_value(col(item.data_column_snowflake_quoted_identifiers[0])).over( Window.order_by(col(item.row_position_snowflake_quoted_identifier)) ), + item.cached_data_column_snowpark_pandas_types[0], ) last_value_snowflake_quoted_identifier = ( item_frame.data_column_snowflake_quoted_identifiers[-1] diff --git a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py index 2e7c6c91a87..0642c80fa1d 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/join_utils.py @@ -236,9 +236,14 @@ def _create_internal_frame_with_join_or_align_result( right.data_column_snowflake_quoted_identifiers ) ) + data_column_types = ( + left.cached_data_column_snowpark_pandas_types + + right.cached_data_column_snowpark_pandas_types + ) index_column_pandas_labels = [] index_column_snowflake_quoted_identifiers = [] + index_column_types = [] left_quoted_identifiers_map = ( result_helper.result_column_mapper.left_quoted_identifiers_map.copy() @@ -257,6 +262,7 @@ def _create_internal_frame_with_join_or_align_result( left.index_column_snowflake_quoted_identifiers, ) ) + index_column_types.extend(left.cached_index_column_snowpark_pandas_types) if InheritJoinIndex.FROM_RIGHT in inherit_index: index_column_pandas_labels.extend(right.index_column_pandas_labels) index_column_snowflake_quoted_identifiers.extend( @@ -264,6 +270,7 @@ def _create_internal_frame_with_join_or_align_result( right.index_column_snowflake_quoted_identifiers, ) ) + index_column_types.extend(right.cached_index_column_snowpark_pandas_types) if key_coalesce_config: coalesce_column_identifiers = [] @@ -271,6 +278,17 @@ def _create_internal_frame_with_join_or_align_result( for origin_left_col, origin_right_col, coalesce_config in zip( left_on, right_on, key_coalesce_config ): + coalesce_col_type = None + origin_left_col_type = ( + left.snowflake_quoted_identifier_to_snowpark_pandas_type[ + origin_left_col + ] + ) + origin_right_col_type = ( + right.snowflake_quoted_identifier_to_snowpark_pandas_type[ + origin_right_col + ] + ) if coalesce_config == JoinKeyCoalesceConfig.NONE: continue left_col = result_helper.map_left_quoted_identifiers([origin_left_col])[0] @@ -295,14 +313,18 @@ def _create_internal_frame_with_join_or_align_result( ) coalesce_column_identifiers.append(coalesce_column_identifier) coalesce_column_values.append(coalesce(left_col, right_col)) + if origin_left_col_type == origin_right_col_type: + coalesce_col_type = origin_left_col_type elif how == "right": # No coalescing required for 'right' join. Simply use right join key # as output column. coalesce_column_identifier = right_col + coalesce_col_type = origin_right_col_type elif how in ("inner", "left", "coalesce"): # No coalescing required for 'left' or 'inner' join and for 'left' or # 'coalesce' align. Simply use left join key as output column. coalesce_column_identifier = left_col + coalesce_col_type = origin_left_col_type else: raise AssertionError(f"Unsupported join/align type {how}") @@ -316,17 +338,25 @@ def _create_internal_frame_with_join_or_align_result( index = data_column_snowflake_quoted_identifiers.index(right_col) data_column_snowflake_quoted_identifiers.pop(index) data_column_pandas_labels.pop(index) + data_column_types.pop(index) elif right_col in index_column_snowflake_quoted_identifiers: # Remove duplicate index column if present. index = index_column_snowflake_quoted_identifiers.index(right_col) index_column_snowflake_quoted_identifiers.pop(index) index_column_pandas_labels.pop(index) + index_column_types.pop(index) - # Update data/index column identifier + # Update data/index column identifiers and types + for i, x in enumerate(data_column_snowflake_quoted_identifiers): + if x == left_col: + data_column_types[i] = coalesce_col_type data_column_snowflake_quoted_identifiers = [ coalesce_column_identifier if x == left_col else x for x in data_column_snowflake_quoted_identifiers ] + for i, x in enumerate(index_column_snowflake_quoted_identifiers): + if x == left_col: + index_column_types[i] = coalesce_col_type index_column_snowflake_quoted_identifiers = [ coalesce_column_identifier if x == left_col else x for x in index_column_snowflake_quoted_identifiers @@ -370,8 +400,8 @@ def _create_internal_frame_with_join_or_align_result( index_column_pandas_labels=index_column_pandas_labels, index_column_snowflake_quoted_identifiers=index_column_snowflake_quoted_identifiers, data_column_pandas_index_names=data_column_pandas_index_names, - data_column_types=None, - index_column_types=None, + data_column_types=data_column_types, + index_column_types=index_column_types, ) result_column_mapper = JoinOrAlignResultColumnMapper( left_quoted_identifiers_map, diff --git a/src/snowflake/snowpark/modin/plugin/_internal/snowpark_pandas_types.py b/src/snowflake/snowpark/modin/plugin/_internal/snowpark_pandas_types.py index 0f58a94fd03..9c45dace93d 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/snowpark_pandas_types.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/snowpark_pandas_types.py @@ -6,14 +6,14 @@ import inspect from abc import ABCMeta, abstractmethod from dataclasses import dataclass -from typing import Any, Callable, NamedTuple, Optional, Union +from typing import Any, Callable, NamedTuple, Optional, Tuple, Type, Union import numpy as np import pandas as native_pd from snowflake.snowpark.column import Column from snowflake.snowpark.modin.plugin.utils.warning_message import WarningMessage -from snowflake.snowpark.types import LongType +from snowflake.snowpark.types import DataType, LongType TIMEDELTA_WARNING_MESSAGE = ( "Snowpark pandas support for Timedelta is not currently available." @@ -21,7 +21,9 @@ _python_type_to_from_pandas: dict[type, Callable[[Any], Any]] = {} -_pandas_type_to_snowpark_pandas_type: dict[type, "SnowparkPandasType"] = {} +"""Map Python type to its from_pandas method""" +_type_to_snowpark_pandas_type: dict[Union[type, np.dtype], type] = {} +"""Map Python type and pandas dtype to Snowpark pandas type""" class SnowparkPandasTypeMetaclass( @@ -48,25 +50,23 @@ def __new__(cls: type, clsname: str, bases: Any, attrs: Any) -> type: if inspect.isabstract(new_snowpark_python_type): return new_snowpark_python_type - for cls in new_snowpark_python_type.types_to_convert_with_from_pandas: - for existing_cls in _python_type_to_from_pandas: + for type in new_snowpark_python_type.types_to_convert_with_from_pandas: + assert inspect.isclass(type) + for existing_type in _python_type_to_from_pandas: # we don't want any class in _python_type_to_from_pandas to be # a subclass of another type in _python_type_to_from_pandas. # Otherwise, the rewriting rules for the two types may # conflict. assert not issubclass( - cls, existing_cls - ), f"Already registered from_pandas for class {cls} with {existing_cls}" - _python_type_to_from_pandas[cls] = new_snowpark_python_type.from_pandas - - for existing_cls in _pandas_type_to_snowpark_pandas_type: - # we don't want any class in _pandas_type_to_snowpark_pandas_type to be - # a subclass of another type in _pandas_type_to_snowpark_pandas_type. - # Otherwise, the conversions rules for the two types may conflict. - assert not issubclass( - new_snowpark_python_type.pandas_type, existing_cls - ), f"Already registered Snowpark pandas type for class {cls} with {existing_cls}" - _pandas_type_to_snowpark_pandas_type[ + type, existing_type + ), f"Already registered from_pandas for class {type} with {existing_type}" + _python_type_to_from_pandas[type] = new_snowpark_python_type.from_pandas + _type_to_snowpark_pandas_type[type] = new_snowpark_python_type + + assert ( + new_snowpark_python_type.pandas_type not in _type_to_snowpark_pandas_type + ), f"Already registered Snowpark pandas type for pandas type {new_snowpark_python_type.pandas_type}" + _type_to_snowpark_pandas_type[ new_snowpark_python_type.pandas_type ] = new_snowpark_python_type @@ -92,12 +92,14 @@ def to_pandas(value: Any) -> Any: @staticmethod def get_snowpark_pandas_type_for_pandas_type( - pandas_type: type, + pandas_type: Union[type, np.dtype], ) -> Optional["SnowparkPandasType"]: """ Get the corresponding Snowpark pandas type, if it exists, for a given pandas type. """ - return _pandas_type_to_snowpark_pandas_type.get(pandas_type, None) + if pandas_type in _type_to_snowpark_pandas_type: + return _type_to_snowpark_pandas_type[pandas_type]() + return None class SnowparkPandasColumn(NamedTuple): @@ -117,9 +119,13 @@ class TimedeltaType(SnowparkPandasType, LongType): two times. """ - snowpark_type = LongType() - pandas_type = np.dtype("timedelta64[ns]") - types_to_convert_with_from_pandas = [native_pd.Timedelta, datetime.timedelta] + snowpark_type: DataType = LongType() + pandas_type: np.dtype = np.dtype("timedelta64[ns]") + types_to_convert_with_from_pandas: Tuple[Type] = ( # type: ignore[assignment] + native_pd.Timedelta, + datetime.timedelta, + np.timedelta64, + ) def __init__(self) -> None: # TODO(SNOW-1620452): Remove this warning message before releasing @@ -135,7 +141,9 @@ def to_pandas(value: int) -> native_pd.Timedelta: return native_pd.Timedelta(value, unit="nanosecond") @staticmethod - def from_pandas(value: Union[native_pd.Timedelta, datetime.timedelta]) -> int: + def from_pandas( + value: Union[native_pd.Timedelta, datetime.timedelta, np.timedelta64] + ) -> int: """ Convert a pandas representation of a Timedelta to its nanoseconds. """ diff --git a/src/snowflake/snowpark/modin/plugin/_internal/transpose_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/transpose_utils.py index b7204edb619..77c661c6971 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/transpose_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/transpose_utils.py @@ -27,6 +27,7 @@ parse_object_construct_snowflake_quoted_identifier_and_extract_pandas_label, serialize_pandas_labels, ) +from snowflake.snowpark.modin.plugin.utils.warning_message import WarningMessage TRANSPOSE_INDEX = "TRANSPOSE_IDX" # transpose value column used in unpivot @@ -277,6 +278,30 @@ def clean_up_transpose_result_index_and_labels( OrderingColumn(row_position_snowflake_quoted_identifier) ) + original_frame_data_column_types = ( + original_frame.cached_data_column_snowpark_pandas_types + ) + if all(t is None for t in original_frame_data_column_types): + new_data_column_types = None + elif len(set(original_frame_data_column_types)) == 1: + # unique type + new_data_column_types = [original_frame_data_column_types[0]] * len( + new_data_column_snowflake_quoted_identifiers + ) + else: + # transpose will lose the type + new_data_column_types = None + WarningMessage.lost_type_warning( + "transpose", + ", ".join( + [ + type(t).__name__ + for t in set(original_frame_data_column_types) + if t is not None + ] + ), + ) + new_internal_frame = InternalFrame.create( ordered_dataframe=ordered_transposed_df, data_column_pandas_labels=new_data_column_pandas_labels, @@ -284,7 +309,7 @@ def clean_up_transpose_result_index_and_labels( data_column_snowflake_quoted_identifiers=new_data_column_snowflake_quoted_identifiers, index_column_pandas_labels=new_index_column_pandas_labels, index_column_snowflake_quoted_identifiers=new_index_column_snowflake_quoted_identifiers, - data_column_types=None, + data_column_types=new_data_column_types, index_column_types=None, ) diff --git a/src/snowflake/snowpark/modin/plugin/_internal/type_utils.py b/src/snowflake/snowpark/modin/plugin/_internal/type_utils.py index 8be48dace45..4edf35070ca 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/type_utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/type_utils.py @@ -224,7 +224,7 @@ class TypeMapper: @classmethod def to_snowflake( cls, p: Union[np.dtype, ExtensionDtype, native_pd.Timestamp] - ) -> DataType: + ) -> Union[DataType, SnowparkPandasType]: """ map a pandas or numpy type to snowpark data type. """ @@ -248,7 +248,7 @@ def to_snowflake( SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type(p) ) if snowpark_pandas_type is not None: - return snowpark_pandas_type() + return snowpark_pandas_type try: return PANDAS_TO_SNOWFLAKE_MAP[p] diff --git a/src/snowflake/snowpark/modin/plugin/_internal/utils.py b/src/snowflake/snowpark/modin/plugin/_internal/utils.py index 2c641a91410..99dc6ec3638 100644 --- a/src/snowflake/snowpark/modin/plugin/_internal/utils.py +++ b/src/snowflake/snowpark/modin/plugin/_internal/utils.py @@ -1524,6 +1524,12 @@ def pandas_lit(value: Any, datatype: Optional[DataType] = None) -> Column: Column(Literal(str(value))) ) + snowpark_pandas_type = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type( + type(value) + ) + if snowpark_pandas_type: + return Column(Literal(type(snowpark_pandas_type).from_pandas(value))) + value = ( convert_numpy_pandas_scalar_to_snowpark_literal(value) if is_scalar(value) diff --git a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py index eab23c63616..e82b0789398 100644 --- a/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py +++ b/src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py @@ -8014,8 +8014,6 @@ def take_2d_positional( BaseQueryCompiler New masked QueryCompiler. """ - self._raise_not_implemented_error_for_timedelta() - # TODO: SNOW-884220 support multiindex # index can only be a query compiler or slice object assert isinstance(index, (SnowflakeQueryCompiler, slice)) @@ -8334,8 +8332,6 @@ def take_2d_labels( ------- SnowflakeQueryCompiler """ - self._raise_not_implemented_error_for_timedelta() - if self._modin_frame.is_multiindex(axis=0) and ( is_scalar(index) or isinstance(index, tuple) ): @@ -8600,8 +8596,6 @@ def transpose(self) -> "SnowflakeQueryCompiler": # STEP 1) Construct a temporary index column that contains the original index with position. # STEP 2) Perform an unpivot which flattens the original data columns into a single name and value rows # grouped by the temporary transpose index column. - self._raise_not_implemented_error_for_timedelta() - unpivot_result = prepare_and_unpivot_for_transpose( frame, self, is_single_row=False ) diff --git a/src/snowflake/snowpark/modin/plugin/utils/warning_message.py b/src/snowflake/snowpark/modin/plugin/utils/warning_message.py index ecd0a8c5a8d..80805d72ac7 100644 --- a/src/snowflake/snowpark/modin/plugin/utils/warning_message.py +++ b/src/snowflake/snowpark/modin/plugin/utils/warning_message.py @@ -111,3 +111,9 @@ def warning_if_engine_args_is_set( "engine_kwargs", engine_parameter_ignored_message, ) + + @classmethod + def lost_type_warning(cls, operation: str, type: str) -> None: + cls.single_warning( + f"`{type}` may be lost in `{operation}`'s result, please use `astype` to convert the result type back." + ) diff --git a/tests/integ/modin/types/test_timedelta.py b/tests/integ/modin/types/test_timedelta.py index f0d8440009f..833a5127228 100644 --- a/tests/integ/modin/types/test_timedelta.py +++ b/tests/integ/modin/types/test_timedelta.py @@ -5,11 +5,13 @@ import logging import modin.pandas as pd +import numpy as np import pandas as native_pd import pytest from snowflake.snowpark.modin.plugin._internal.snowpark_pandas_types import ( TIMEDELTA_WARNING_MESSAGE, + TimedeltaType, ) from tests.integ.modin.sql_counter import sql_count_checker from tests.integ.modin.utils import ( @@ -114,3 +116,20 @@ def test_timedelta_not_supported(): match="validate_groupby is not yet implemented for Timedelta Type", ): df.groupby("a").count() + + +@sql_count_checker(query_count=0) +@pytest.mark.parametrize( + "timedelta, snowpark_pandas_value", + [ + [pd.Timedelta("1 day"), 24 * 3600 * 10**9], + [np.timedelta64(100, "ns"), 100], + [np.timedelta64(100, "s"), 100 * 10**9], + [ + native_pd.Series(native_pd.Timedelta("10000 day"))[0], + 10000 * 24 * 3600 * 10**9, + ], + ], +) +def test_TimedeltaType_from_pandas(timedelta, snowpark_pandas_value): + assert TimedeltaType.from_pandas(timedelta) == snowpark_pandas_value diff --git a/tests/integ/modin/types/test_timedelta_indexing.py b/tests/integ/modin/types/test_timedelta_indexing.py new file mode 100644 index 00000000000..a224c4e74d6 --- /dev/null +++ b/tests/integ/modin/types/test_timedelta_indexing.py @@ -0,0 +1,391 @@ +# +# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. +# + +import functools +import logging + +import modin.pandas as pd +import pandas as native_pd +import pytest +from modin.pandas.utils import is_scalar + +from snowflake.snowpark.exceptions import SnowparkSQLException +from tests.integ.modin.sql_counter import SqlCounter +from tests.integ.modin.utils import assert_series_equal, eval_snowpark_pandas_result + + +@pytest.mark.parametrize( + "key, iloc_join_count, loc_query_count, loc_join_count", + [ + [2, 2, 2, 2], + [[2, 1], 2, 1, 1], + [slice(1, None), 0, 1, 0], + [[True, False, False, True], 1, 1, 1], + ], +) +def test_series_indexing_get_timedelta( + key, iloc_join_count, loc_query_count, loc_join_count +): + # This test only verify indexing return timedelta results correctly + td_s = native_pd.Series( + [ + native_pd.Timedelta("1 days 1 hour"), + native_pd.Timedelta("2 days 1 minute"), + native_pd.Timedelta("3 days 1 nanoseconds"), + native_pd.Timedelta("100 nanoseconds"), + ] + ) + snow_td_s = pd.Series(td_s) + + def run_test(api, query_count, join_count): + with SqlCounter(query_count=query_count, join_count=join_count): + if is_scalar(key): + assert api(snow_td_s) == api(td_s) + else: + eval_snowpark_pandas_result(snow_td_s, td_s, api) + + run_test(lambda s: s.iloc[key], 1, iloc_join_count) + run_test(lambda s: s[key], loc_query_count, loc_join_count) + run_test(lambda s: s.loc[key], loc_query_count, loc_join_count) + if is_scalar(key): + run_test(lambda s: s.iat[key], 1, iloc_join_count) + run_test(lambda s: s.at[key], loc_query_count, loc_join_count) + + +@pytest.mark.parametrize( + "key, query_count, join_count, type_preserved", + [ + [(1, 1), 1, 2, True], + [(2, 2), 1, 2, True], + [([2, 1], 1), 1, 2, True], + [ + (2, [1, 0]), + 1, + 4, + True, + ], # require transpose and keep result column type as timedelta + [(2, ...), 1, 4, False], # require transpose but lose the type + [(slice(1, None), 0), 1, 0, True], + [([True, False, False, True], 1), 1, 1, True], + [(1, "a"), 2, 2, True], + [(2, "b"), 2, 2, True], + [([2, 1], "a"), 1, 1, True], + [ + (2, ["b", "a"]), + 2, + 3, + True, + ], # require transpose and keep result column type as timedelta + [(2, ...), 1, 4, False], # require transpose but lose the type + [(slice(1, None), "a"), 1, 0, True], + [([True, False, False, True], "b"), 1, 1, True], + ], +) +def test_df_indexing_get_timedelta( + key, query_count, join_count, type_preserved, caplog +): + # This test only verify indexing return timedelta results correctly + td = native_pd.DataFrame( + { + "a": [ + native_pd.Timedelta("1 days 1 hour"), + native_pd.Timedelta("2 days 1 minute"), + native_pd.Timedelta("3 days 1 nanoseconds"), + native_pd.Timedelta("100 nanoseconds"), + ], + "b": native_pd.timedelta_range("1 hour", "1 day", 4), + "c": [1, 2, 3, 4], + } + ) + snow_td = pd.DataFrame(td) + + expected_warning_msg = ( + "`TimedeltaType` may be lost in `transpose`'s result, please use `astype` to convert the " + "result type back." + ) + + def run_test(api, query_count, join_count): + with SqlCounter(query_count=query_count, join_count=join_count): + caplog.clear() + if is_scalar(key[0]) and is_scalar(key[1]): + assert api(snow_td) == api(td) + else: + native_td = td if type_preserved else td.astype(int) + eval_snowpark_pandas_result(snow_td, native_td, api) + if type_preserved: + assert expected_warning_msg not in caplog.text + else: + assert expected_warning_msg in caplog.text + + with caplog.at_level(logging.DEBUG): + if isinstance(key[1], str) or ( + isinstance(key[1], list) and isinstance(key[1][0], str) + ): + api = lambda s: s.loc[key] # noqa: E731 + else: + api = lambda s: s.iloc[key] # noqa: E731 + run_test(api, query_count, join_count) + if is_scalar(key[0]) and is_scalar(key[1]): + if isinstance(key[1], str): + api = lambda s: s.at[key] # noqa: E731 + else: + api = lambda s: s.iat[key] # noqa: E731 + run_test(api, query_count, join_count) + + +def test_df_getitem_timedelta(): + td = native_pd.DataFrame( + { + "a": [ + native_pd.Timedelta("1 days 1 hour"), + native_pd.Timedelta("2 days 1 minute"), + native_pd.Timedelta("3 days 1 nanoseconds"), + native_pd.Timedelta("100 nanoseconds"), + ], + "b": native_pd.timedelta_range("1 hour", "1 day", 4), + "c": [1, 2, 3, 4], + } + ) + snow_td = pd.DataFrame(td) + with SqlCounter(query_count=1, join_count=0): + eval_snowpark_pandas_result( + snow_td.copy(), + td, + lambda df: df["b"], + ) + + with SqlCounter(query_count=1, join_count=1): + eval_snowpark_pandas_result( + snow_td.copy(), + td, + lambda df: df[[True, False, False, True]], + ) + + +def test_series_indexing_set_timedelta(): + td_s = native_pd.Series( + [ + native_pd.Timedelta("1 days 1 hour"), + native_pd.Timedelta("2 days 1 minute"), + native_pd.Timedelta("3 days 1 nanoseconds"), + native_pd.Timedelta("100 nanoseconds"), + ] + ) + snow_td_s = pd.Series(td_s) + + def iloc_set(key, item, s): + s.iloc[key] = item + return s + + with SqlCounter(query_count=1, join_count=2): + # single value + eval_snowpark_pandas_result( + snow_td_s.copy(), + td_s, + functools.partial(iloc_set, 2, pd.Timedelta("2 days 2 hours")), + ) + + with SqlCounter(query_count=1, join_count=2): + # multi values + eval_snowpark_pandas_result( + snow_td_s.copy(), + td_s, + functools.partial(iloc_set, slice(2, None), pd.Timedelta("2 days 2 hours")), + ) + + +def test_df_indexing_set_timedelta(): + td = native_pd.DataFrame( + { + "a": [ + native_pd.Timedelta("1 days 1 hour"), + native_pd.Timedelta("2 days 1 minute"), + native_pd.Timedelta("3 days 1 nanoseconds"), + native_pd.Timedelta("100 nanoseconds"), + ], + "b": native_pd.timedelta_range("1 hour", "1 day", 4), + "c": [1, 2, 3, 4], + } + ) + snow_td = pd.DataFrame(td) + + def iloc_set(key, item, df): + df.iloc[key] = item + return df + + def run_test(key, item, natvie_df=td, api=iloc_set): + eval_snowpark_pandas_result( + snow_td.copy(), natvie_df.copy(), functools.partial(api, key, item) + ) + + # Set timedelta value to timedelta columns + item = pd.Timedelta("2 days 2 hours") + + with SqlCounter(query_count=1, join_count=2): + # single value + key = (1, 1) + run_test(key, item) + + with SqlCounter(query_count=1, join_count=2): + # single column + key = (..., 0) + run_test(key, item) + + with SqlCounter(query_count=1, join_count=2): + # multi columns + key = (..., [0, 1]) + run_test(key, item) + + with SqlCounter(query_count=1, join_count=3): + # multi columns with array + key = (..., [0, 1]) + run_test(key, [item] * 2) + + # Set other types to timedelta columns + item = "string" + with SqlCounter(query_count=0): + # single value + key = (1, 1) + with pytest.raises( + SnowparkSQLException, match="Numeric value 'string' is not recognized" + ): + run_test(key, item) + + item = 1000 + with SqlCounter(query_count=1, join_count=2): + # single value + key = (1, 1) + td_int = td.copy() + td_int["b"] = td_int["b"].astype(int) + # timedelta type is not preserved in this case + run_test(key, item, natvie_df=td_int) + + def df_set(key, item, df): + df[key] = item + return df + + # Set timedelta value to timedelta columns + item = pd.Timedelta("2 days 2 hours") + + with SqlCounter(query_count=1, join_count=0): + # single column + key = "b" + run_test(key, item, api=df_set) + + with SqlCounter(query_count=1, join_count=0): + # multi columns + key = ["a", "b"] + run_test(key, item, api=df_set) + + with SqlCounter(query_count=1, join_count=0): + # multi columns with array + key = ["a", "b"] + run_test(key, [item] * 2, api=df_set) + + def loc_set(key, item, df): + df.loc[key] = item + return df + + with SqlCounter(query_count=1, join_count=1): + # single value + key = (1, "a") + run_test(key, item, api=loc_set) + + with SqlCounter(query_count=1, join_count=0): + # single column + key = (slice(None, None, None), "a") + run_test(key, item, api=loc_set) + + with SqlCounter(query_count=1, join_count=0): + # multi columns + key = (slice(None, None, None), ["a", "b"]) + run_test(key, item, api=loc_set) + + with SqlCounter(query_count=1, join_count=0): + # multi columns with array + key = (slice(None, None, None), ["a", "b"]) + run_test(key, [item] * 2, api=loc_set) + + # Set other types to timedelta columns + item = "string" + with SqlCounter(query_count=0): + # single value + key = (1, "a") + with pytest.raises( + SnowparkSQLException, match="Numeric value 'string' is not recognized" + ): + run_test(key, item, api=loc_set) + + item = 1000 + with SqlCounter(query_count=1, join_count=1): + # single value + key = (1, "b") + td_int = td.copy() + td_int["b"] = td_int["b"].astype(int) + # timedelta type is not preserved in this case + run_test(key, item, natvie_df=td_int, api=loc_set) + + +def test_df_indexing_enlargement_timedelta(): + td = native_pd.DataFrame( + { + "a": [ + native_pd.Timedelta("1 days 1 hour"), + native_pd.Timedelta("2 days 1 minute"), + native_pd.Timedelta("3 days 1 nanoseconds"), + native_pd.Timedelta("100 nanoseconds"), + ], + "b": native_pd.timedelta_range("1 hour", "1 day", 4), + "c": [1, 2, 3, 4], + } + ) + snow_td = pd.DataFrame(td) + + def setitem_enlargement(key, item, df): + df[key] = item + return df + + key = "x" + item = pd.Timedelta("2 hours") + with SqlCounter(query_count=1, join_count=0): + eval_snowpark_pandas_result( + snow_td.copy(), td.copy(), functools.partial(setitem_enlargement, key, item) + ) + + key = 10 + with SqlCounter(query_count=1, join_count=1): + eval_snowpark_pandas_result( + snow_td["a"].copy(), + td["a"].copy(), + functools.partial(setitem_enlargement, key, item), + ) + + def loc_enlargement(key, item, df): + df.loc[key] = item + return df + + key = (slice(None, None, None), "x") + + with SqlCounter(query_count=1, join_count=0): + eval_snowpark_pandas_result( + snow_td.copy(), td.copy(), functools.partial(loc_enlargement, key, item) + ) + + key = 10 + with SqlCounter(query_count=1, join_count=1): + eval_snowpark_pandas_result( + snow_td["a"].copy(), + td["a"].copy(), + functools.partial(loc_enlargement, key, item), + ) + + # single row + key = (10, slice(None, None, None)) + + with SqlCounter(query_count=1, join_count=1): + # dtypes does not change while in native pandas, col "c"'s type will change to object + assert_series_equal( + loc_enlargement(key, item, snow_td.copy()).to_pandas().dtypes, + snow_td.dtypes, + ) diff --git a/tests/unit/modin/test_snowpark_pandas_types.py b/tests/unit/modin/test_snowpark_pandas_types.py index 6f139ba8a76..0e93de4b3e0 100644 --- a/tests/unit/modin/test_snowpark_pandas_types.py +++ b/tests/unit/modin/test_snowpark_pandas_types.py @@ -2,24 +2,38 @@ # Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. # -import re -from dataclasses import FrozenInstanceError +import datetime +import numpy as np +import pandas as native_pd import pytest from snowflake.snowpark.modin.plugin._internal.snowpark_pandas_types import ( + SnowparkPandasType, TimedeltaType, ) -def test_timedelta_type_is_immutable(): - """ - Test that Timedelta instances are immutable. - - We neeed SnowparkPandasType subclasses to be immutable so that we can store - them in InternalFrame. - """ - with pytest.raises( - FrozenInstanceError, match=re.escape("cannot assign to field 'x'") - ): - TimedeltaType().x = 3 +@pytest.mark.parametrize( + "pandas_obj, snowpark_pandas_type", + [ + [native_pd.Timedelta("1 day"), TimedeltaType], + [np.timedelta64(100, "ns"), TimedeltaType], + [np.timedelta64(100, "s"), TimedeltaType], + [ + native_pd.Series(native_pd.Timedelta("1 day")).dtype, + TimedeltaType, + ], # Note dtype is an object not a type + [datetime.timedelta(days=2), TimedeltaType], + [123, None], + ["string", None], + [native_pd.Interval(left=2, right=5, closed="both"), None], + ], +) +def test_get_snowpark_pandas_type_for_pandas_type(pandas_obj, snowpark_pandas_type): + pandas_type = pandas_obj if isinstance(pandas_obj, np.dtype) else type(pandas_obj) + res = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type(pandas_type) + if snowpark_pandas_type: + assert isinstance(res, snowpark_pandas_type) + else: + assert res is None