Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1625368 SNOW-1634393 Support indexing with Timedelta data columns #2141

Merged
merged 5 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@
- Added limited support for the `Timedelta` type, including
- support `copy`, `cache_result`, `shift`, `sort_index`.
- `NotImplementedError` will be raised for the rest of methods that do not support `Timedelta`.
- support for subtracting two timestamps to get a Timedelta.
- support indexing with Timedelta data columns.
- Added support for index's arithmetic and comparison operators.
- Added support for `Series.dt.round`.
- Added documentation pages for `DatetimeIndex`.
- Added support for `Index.name`, `Index.names`, `Index.rename`, and `Index.set_names`.
- Added support for `Index.__repr__`.
- Added support for `DatetimeIndex.month_name` and `DatetimeIndex.day_name`.
- Added support for `Series.dt.weekday`, `Series.dt.time`, and `DatetimeIndex.time`.
- Added support for subtracting two timestamps to get a Timedelta.
- Added support for `Index.min` and `Index.max`.

#### Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def compute_binary_op_between_snowpark_columns(
first_datatype,
)

binary_op_result_column, snowpark_pandas_result_type = None, None
binary_op_result_column = None

# some operators and the data types have to be handled specially to align with pandas
# However, it is difficult to fail early if the arithmetic operator is not compatible
Expand Down Expand Up @@ -335,7 +335,6 @@ def compute_binary_op_between_snowpark_columns(
):
# Timestamp - NULL or NULL - Timestamp raises SQL compilation error,
# but it's valid in pandas and returns NULL.
snowpark_pandas_result_type = NullType()
sfc-gh-azhan marked this conversation as resolved.
Show resolved Hide resolved
binary_op_result_column = pandas_lit(None)
elif (
op == "sub"
Expand All @@ -344,7 +343,6 @@ def compute_binary_op_between_snowpark_columns(
):
# Timestamp - NULL or NULL - Timestamp raises SQL compilation error,
# but it's valid in pandas and returns NULL.
snowpark_pandas_result_type = NullType()
binary_op_result_column = pandas_lit(None)
elif (
op == "sub"
Expand All @@ -365,7 +363,7 @@ def compute_binary_op_between_snowpark_columns(

return SnowparkPandasColumn(
snowpark_column=binary_op_result_column,
snowpark_pandas_type=snowpark_pandas_result_type,
snowpark_pandas_type=None,
)


Expand Down
16 changes: 15 additions & 1 deletion src/snowflake/snowpark/modin/plugin/_internal/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,21 @@ def _create_snowflake_quoted_identifier_to_snowpark_pandas_type(
f"The length of data_column_types {data_column_types} is different from the length of "
f"data_column_snowflake_quoted_identifiers {data_column_snowflake_quoted_identifiers}"
)
for t in data_column_types:
assert t is None or isinstance(
t, SnowparkPandasType
), f"wrong data_column_types value {t}"
if index_column_types is not None:
assert len(index_column_types) == len(
index_column_snowflake_quoted_identifiers
), (
f"The length of index_column_types {index_column_types} is different from the length of "
f"index_column_snowflake_quoted_identifiers {index_column_snowflake_quoted_identifiers}"
)
for t in index_column_types:
assert t is None or isinstance(
t, SnowparkPandasType
), f"wrong index_column_types value {t}"

return MappingProxyType(
{
Expand Down Expand Up @@ -988,6 +996,7 @@ def project_columns(
self,
pandas_labels: list[Hashable],
column_objects: list[SnowparkColumn],
column_types: Optional[list[Optional[SnowparkPandasType]]] = None,
) -> "InternalFrame":
"""
Project new columns with column_objects as the new data columns for the new Internal Frame.
Expand All @@ -1001,10 +1010,15 @@ def project_columns(
Args:
pandas_labels: The pandas labels for the newly projected data columns
column_objects: the Snowpark columns used to project the new data columns
column_types: The optional SnowparkPandasType for the new column.

Returns:
A new InternalFrame with the newly projected columns as data column
"""
if column_types is None:
column_types = [None] * len(pandas_labels)
else:
assert len(column_types) == len(pandas_labels)
new_column_identifiers = (
self.ordered_dataframe.generate_snowflake_quoted_identifiers(
pandas_labels=pandas_labels,
Expand All @@ -1020,7 +1034,7 @@ def project_columns(
data_column_pandas_index_names=self.data_column_pandas_index_names,
index_column_pandas_labels=self.index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=self.index_column_snowflake_quoted_identifiers,
data_column_types=None,
data_column_types=column_types,
index_column_types=self.cached_index_column_snowpark_pandas_types,
)

Expand Down
120 changes: 84 additions & 36 deletions src/snowflake/snowpark/modin/plugin/_internal/indexing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
join,
)
from snowflake.snowpark.modin.plugin._internal.ordered_dataframe import OrderingColumn
from snowflake.snowpark.modin.plugin._internal.snowpark_pandas_types import (
SnowparkPandasColumn,
SnowparkPandasType,
)
from snowflake.snowpark.modin.plugin._internal.type_utils import (
NUMERIC_SNOWFLAKE_TYPES_TUPLE,
is_numeric_snowpark_type,
Expand Down Expand Up @@ -2436,58 +2440,63 @@ def set_frame_2d_labels(
)

def generate_updated_expr_for_existing_col(
col_pos: int,
) -> Column:
col_pos: int, origin_col_type: Optional[SnowparkPandasType]
) -> SnowparkPandasColumn:
"""
Helper function to generate the updated existing column based on the item value.

Args:
col_pos: the existing column position from internal_frame
origin_col_type: the original column type

Returns:
The updated column
The updated SnowparkPandasColumn
"""
original_col = col(
result_frame.data_column_snowflake_quoted_identifiers[col_pos]
)
# col_pos can be any column in the original frame, i.e., internal_frame. So if it is not in
# existing_column_positions, we can just return the original column
if col_pos not in col_info.existing_column_positions:
return original_col
return SnowparkPandasColumn(original_col, origin_col_type)

col_label = result_frame.data_column_pandas_labels[col_pos]
# col will be updated
col_obj_type = None
if item_is_scalar:
col_obj = pandas_lit(item)
col_obj_type = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type(
type(item)
)
elif item_column_values:
col_obj = pandas_lit(
item_column_values[item_data_col_label_to_pos_map[col_label]]
item_val = item_column_values[item_data_col_label_to_pos_map[col_label]]
col_obj = pandas_lit(item_val)
col_obj_type = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type(
type(item_val)
)
elif not matching_item_columns_by_label:
# columns in item is matched by position not label here. E.g., assuming df as A, B, C three columns,
# df[["A", "B"]] = item will treat the first item's column as A and the second as B. However,
# df[["B", "A"]] = item will treat the first item's column as B and the second as A. Also, if column key
# contains duplicates, e.g, df[["A", "A"]] = item, then only the right index matters, i.e., the second
# column will be treated as A.
col_obj = col(
result_frame.data_column_snowflake_quoted_identifiers[
item_data_col_offset
+ rindex(col_info.existing_column_positions, col_pos)
]
offset = item_data_col_offset + rindex(
col_info.existing_column_positions, col_pos
)
col_obj = col(result_frame.data_column_snowflake_quoted_identifiers[offset])
col_obj_type = result_frame.cached_data_column_snowpark_pandas_types[offset]
elif (
matching_item_columns_by_label
and col_label in item_data_col_label_to_pos_map
):
# col may have value in item, e.g., column "X"
col_obj = col(
result_frame.data_column_snowflake_quoted_identifiers[
item_data_col_offset + item_data_col_label_to_pos_map[col_label]
]
)
offset = item_data_col_offset + item_data_col_label_to_pos_map[col_label]
col_obj = col(result_frame.data_column_snowflake_quoted_identifiers[offset])
col_obj_type = result_frame.cached_data_column_snowpark_pandas_types[offset]
else:
# e.g., columns "E", i.e., column exists in the column key but not in item
col_obj = pandas_lit(None)

if index_is_scalar:
col_obj = iff(
result_frame_index_col.equal_null(pandas_lit(index)),
Expand All @@ -2499,39 +2508,50 @@ def generate_updated_expr_for_existing_col(
col_obj = iff(index_data_col, col_obj, original_col)
elif index_is_frame:
col_obj = iff(index_data_col.is_null(), original_col, col_obj)
return col_obj

def generate_updated_expr_for_new_col(col_label: Hashable) -> Column:
col_obj_type = col_obj_type if col_obj_type == origin_col_type else None

return SnowparkPandasColumn(col_obj, col_obj_type)

def generate_updated_expr_for_new_col(
col_label: Hashable,
) -> SnowparkPandasColumn:
"""
Helper function to generate the newly added column.

Args:
col_label: the label of the new column

Returns:
The new column
The new SnowparkPandasColumn
"""
if item_is_scalar:
new_column = pandas_lit(item)
col_obj_type = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type(
type(item)
)
elif item_column_values:
new_column = item_column_values[item_data_col_label_to_pos_map[col_label]]
col_obj_type = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type(
type(new_column)
)
elif not matching_item_columns_by_label:
offset = item_data_col_offset + item_data_col_label_to_pos_map[col_label]
new_column = col(
result_frame.data_column_snowflake_quoted_identifiers[
item_data_col_offset + item_data_col_label_to_pos_map[col_label]
]
result_frame.data_column_snowflake_quoted_identifiers[offset]
)
col_obj_type = result_frame.cached_data_column_snowpark_pandas_types[offset]
elif (
matching_item_columns_by_label
and col_label in item_data_col_label_to_pos_map
):
offset = item_data_col_offset + item_data_col_label_to_pos_map[col_label]
new_column = col(
result_frame.data_column_snowflake_quoted_identifiers[
item_data_col_offset + item_data_col_label_to_pos_map[col_label]
]
result_frame.data_column_snowflake_quoted_identifiers[offset]
)
col_obj_type = result_frame.cached_data_column_snowpark_pandas_types[offset]
else:
return pandas_lit(None)
return SnowparkPandasColumn(pandas_lit(None), None)
if index_is_scalar:
new_column = iff(
result_frame_index_col.equal_null(pandas_lit(index)),
Expand All @@ -2543,14 +2563,14 @@ def generate_updated_expr_for_new_col(col_label: Hashable) -> Column:
new_column = iff(index_data_col, new_column, pandas_lit(None))
elif index_is_frame:
new_column = iff(index_data_col.is_null(), pandas_lit(None), new_column)
return new_column
return SnowparkPandasColumn(new_column, col_obj_type)

# The rest of code is to generate the list of project columns and labels for a loc set operation that can involve
# replacing existing column values as well as adding new columns. The caller must provide the callables to select
# the appropriate column replacement or new column logic.
#
# We use project_columns methods to handle all columns in one go
project_labels, project_columns = [], []
project_labels, project_columns, project_column_types = [], [], []

num_data_columns = len(internal_frame.data_column_pandas_labels)
duplicate_data_column_pos_to_count_map = (
Expand All @@ -2573,23 +2593,31 @@ def generate_updated_expr_for_new_col(col_label: Hashable) -> Column:
for col_pos in range(num_data_columns):
col_label = result_frame.data_column_pandas_labels[col_pos]
project_labels.append(col_label)
col_obj = generate_updated_expr_for_existing_col(col_pos)
project_columns.append(col_obj)
origin_col_type = result_frame.cached_data_column_snowpark_pandas_types[col_pos]
snowpark_pandas_col = generate_updated_expr_for_existing_col(
col_pos, origin_col_type
)
project_columns.append(snowpark_pandas_col.snowpark_column)
project_column_types.append(snowpark_pandas_col.snowpark_pandas_type)

# When duplicate is needed, pandas will duplicate the column right after the original columns.
if col_pos in duplicate_data_column_pos_to_count_map:
cnt = duplicate_data_column_pos_to_count_map[col_pos]
project_labels += [col_label] * cnt
project_columns += [col_obj] * cnt
project_columns += [snowpark_pandas_col.snowpark_column] * cnt
project_column_types += [snowpark_pandas_col.snowpark_pandas_type] * cnt

# Last, append new columns
for col_label in new_data_column_pandas_labels_to_append:
new_column = generate_updated_expr_for_new_col(col_label)
new_snowpark_pandas_column = generate_updated_expr_for_new_col(col_label)
project_labels.append(col_label)
project_columns.append(new_column)
project_columns.append(new_snowpark_pandas_column.snowpark_column)
project_column_types.append(new_snowpark_pandas_column.snowpark_pandas_type)

return result_frame.project_columns(
pandas_labels=project_labels, column_objects=project_columns
pandas_labels=project_labels,
column_objects=project_columns,
column_types=project_column_types,
)


Expand Down Expand Up @@ -2674,7 +2702,10 @@ def set_frame_2d_positional(
kv_frame = get_kv_frame_from_index_and_item_frames(index, item)
item_data_columns_len = len(item.data_column_snowflake_quoted_identifiers)
else:
kv_frame = index.append_column(ITEM_VALUE_LABEL, pandas_lit(item))
item_type = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type(
type(item)
)
kv_frame = index.append_column(ITEM_VALUE_LABEL, pandas_lit(item), item_type)
item_data_columns_len = 1

# Next we join the key-value frame with the original frame based on row_position and row key
Expand All @@ -2693,6 +2724,7 @@ def set_frame_2d_positional(

frame = internal_frame.ensure_row_position_column()
kv_frame = kv_frame.ensure_row_position_column()
item_type = kv_frame.cached_data_column_snowpark_pandas_types[-1]

# To match the columns of the original dataframe (see [df] above) and the item values (see [item] above) we
# use the "columns" containing the column position indices. For example, if columns=[0, 2, 3] this would
Expand Down Expand Up @@ -2739,6 +2771,7 @@ def set_frame_2d_positional(

df_kv_frame = df_kv_frame.ensure_row_position_column()

new_data_column_types = []
for col_pos, snowflake_quoted_identifier_pair in enumerate(
zip(
frame.data_column_snowflake_quoted_identifiers,
Expand Down Expand Up @@ -2781,12 +2814,26 @@ def set_frame_2d_positional(
df_snowflake_quoted_identifier,
).as_(new_snowflake_quoted_identifier)
)
if (
frame.snowflake_quoted_identifier_to_snowpark_pandas_type[
original_snowflake_quoted_identifier
]
== item_type
):
new_data_column_types.append(item_type)
else:
new_data_column_types.append(None)
else:
select_list.append(
col(original_snowflake_quoted_identifier).as_(
new_snowflake_quoted_identifier
)
)
new_data_column_types.append(
frame.snowflake_quoted_identifier_to_snowpark_pandas_type[
original_snowflake_quoted_identifier
]
)

select_list.append(df_kv_frame.row_position_snowflake_quoted_identifier)
ordered_dataframe = df_kv_frame.ordered_dataframe.select(select_list)
Expand All @@ -2797,7 +2844,7 @@ def set_frame_2d_positional(
data_column_snowflake_quoted_identifiers=new_data_column_snowflake_quoted_identifiers,
index_column_pandas_labels=frame.index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=frame.index_column_snowflake_quoted_identifiers,
data_column_types=frame.cached_data_column_snowpark_pandas_types,
data_column_types=new_data_column_types,
index_column_types=frame.cached_index_column_snowpark_pandas_types,
)

Expand Down Expand Up @@ -3014,6 +3061,7 @@ def get_item_series_as_single_row_frame(
last_value(col(item.data_column_snowflake_quoted_identifiers[0])).over(
Window.order_by(col(item.row_position_snowflake_quoted_identifier))
),
item.cached_data_column_snowpark_pandas_types[0],
)
last_value_snowflake_quoted_identifier = (
item_frame.data_column_snowflake_quoted_identifiers[-1]
Expand Down
Loading
Loading