Skip to content

Commit

Permalink
SNOW-1625468 Support indexing with Timedelta data columns
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-azhan committed Aug 21, 2024
1 parent 449ce0c commit f5536b0
Show file tree
Hide file tree
Showing 13 changed files with 640 additions and 83 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
- Added limited support for the `Timedelta` type, including
- support `copy`, `cache_result`, `shift`, `sort_index`.
- `NotImplementedError` will be raised for the rest of methods that do not support `Timedelta`.
- support for subtracting two timestamps to get a Timedelta.
- support indexing with Timedelta data columns.
- Added support for index's arithmetic and comparison operators.
- Added support for `Series.dt.round`.
- Added documentation pages for `DatetimeIndex`.
- Added support for `Index.name`, `Index.names`, `Index.rename`, and `Index.set_names`.
- Added support for `Index.__repr__`.
- Added support for `DatetimeIndex.month_name` and `DatetimeIndex.day_name`.
- Added support for `Series.dt.weekday`, `Series.dt.time`, and `DatetimeIndex.time`.
- Added support for subtracting two timestamps to get a Timedelta.

#### Bug Fixes

Expand Down
16 changes: 15 additions & 1 deletion src/snowflake/snowpark/modin/plugin/_internal/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,21 @@ def _create_snowflake_quoted_identifier_to_snowpark_pandas_type(
f"The length of data_column_types {data_column_types} is different from the length of "
f"data_column_snowflake_quoted_identifiers {data_column_snowflake_quoted_identifiers}"
)
for t in data_column_types:
assert t is None or isinstance(
t, SnowparkPandasType
), f"wrong data_column_types value {t}"
if index_column_types is not None:
assert len(index_column_types) == len(
index_column_snowflake_quoted_identifiers
), (
f"The length of index_column_types {index_column_types} is different from the length of "
f"index_column_snowflake_quoted_identifiers {index_column_snowflake_quoted_identifiers}"
)
for t in index_column_types:
assert t is None or isinstance(
t, SnowparkPandasType
), f"wrong index_column_types value {t}"

return MappingProxyType(
{
Expand Down Expand Up @@ -988,6 +996,7 @@ def project_columns(
self,
pandas_labels: list[Hashable],
column_objects: list[SnowparkColumn],
column_types: Optional[list[Optional[SnowparkPandasType]]] = None,
) -> "InternalFrame":
"""
Project new columns with column_objects as the new data columns for the new Internal Frame.
Expand All @@ -1001,10 +1010,15 @@ def project_columns(
Args:
pandas_labels: The pandas labels for the newly projected data columns
column_objects: the Snowpark columns used to project the new data columns
column_types: The optional SnowparkPandasType for the new column.
Returns:
A new InternalFrame with the newly projected columns as data column
"""
if column_types is None:
column_types = [None] * len(pandas_labels)
else:
assert len(column_types) == len(pandas_labels)
new_column_identifiers = (
self.ordered_dataframe.generate_snowflake_quoted_identifiers(
pandas_labels=pandas_labels,
Expand All @@ -1020,7 +1034,7 @@ def project_columns(
data_column_pandas_index_names=self.data_column_pandas_index_names,
index_column_pandas_labels=self.index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=self.index_column_snowflake_quoted_identifiers,
data_column_types=None,
data_column_types=column_types,
index_column_types=self.cached_index_column_snowpark_pandas_types,
)

Expand Down
115 changes: 82 additions & 33 deletions src/snowflake/snowpark/modin/plugin/_internal/indexing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
join,
)
from snowflake.snowpark.modin.plugin._internal.ordered_dataframe import OrderingColumn
from snowflake.snowpark.modin.plugin._internal.snowpark_pandas_types import (
SnowparkPandasColumn,
SnowparkPandasType,
)
from snowflake.snowpark.modin.plugin._internal.type_utils import (
NUMERIC_SNOWFLAKE_TYPES_TUPLE,
is_numeric_snowpark_type,
Expand Down Expand Up @@ -2436,58 +2440,63 @@ def set_frame_2d_labels(
)

def generate_updated_expr_for_existing_col(
col_pos: int,
) -> Column:
col_pos: int, origin_col_type: Optional[SnowparkPandasType]
) -> SnowparkPandasColumn:
"""
Helper function to generate the updated existing column based on the item value.
Args:
col_pos: the existing column position from internal_frame
origin_col_type: the original column type
Returns:
The updated column
The updated SnowparkPandasColumn
"""
original_col = col(
result_frame.data_column_snowflake_quoted_identifiers[col_pos]
)
# col_pos can be any column in the original frame, i.e., internal_frame. So if it is not in
# existing_column_positions, we can just return the original column
if col_pos not in col_info.existing_column_positions:
return original_col
return SnowparkPandasColumn(original_col, origin_col_type)

col_label = result_frame.data_column_pandas_labels[col_pos]
# col will be updated
col_obj_type = None
if item_is_scalar:
col_obj = pandas_lit(item)
col_obj_type = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type(
type(item)
)
elif item_column_values:
col_obj = pandas_lit(
item_column_values[item_data_col_label_to_pos_map[col_label]]
item_val = item_column_values[item_data_col_label_to_pos_map[col_label]]
col_obj = pandas_lit(item_val)
col_obj_type = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type(
type(item_val)
)
elif not matching_item_columns_by_label:
# columns in item is matched by position not label here. E.g., assuming df as A, B, C three columns,
# df[["A", "B"]] = item will treat the first item's column as A and the second as B. However,
# df[["B", "A"]] = item will treat the first item's column as B and the second as A. Also, if column key
# contains duplicates, e.g, df[["A", "A"]] = item, then only the right index matters, i.e., the second
# column will be treated as A.
col_obj = col(
result_frame.data_column_snowflake_quoted_identifiers[
item_data_col_offset
+ rindex(col_info.existing_column_positions, col_pos)
]
offset = item_data_col_offset + rindex(
col_info.existing_column_positions, col_pos
)
col_obj = col(result_frame.data_column_snowflake_quoted_identifiers[offset])
col_obj_type = result_frame.cached_data_column_snowpark_pandas_types[offset]
elif (
matching_item_columns_by_label
and col_label in item_data_col_label_to_pos_map
):
# col may have value in item, e.g., column "X"
col_obj = col(
result_frame.data_column_snowflake_quoted_identifiers[
item_data_col_offset + item_data_col_label_to_pos_map[col_label]
]
)
offset = item_data_col_offset + item_data_col_label_to_pos_map[col_label]
col_obj = col(result_frame.data_column_snowflake_quoted_identifiers[offset])
col_obj_type = result_frame.cached_data_column_snowpark_pandas_types[offset]
else:
# e.g., columns "E", i.e., column exists in the column key but not in item
col_obj = pandas_lit(None)

if index_is_scalar:
col_obj = iff(
result_frame_index_col.equal_null(pandas_lit(index)),
Expand All @@ -2499,39 +2508,51 @@ def generate_updated_expr_for_existing_col(
col_obj = iff(index_data_col, col_obj, original_col)
elif index_is_frame:
col_obj = iff(index_data_col.is_null(), original_col, col_obj)
return col_obj

def generate_updated_expr_for_new_col(col_label: Hashable) -> Column:
col_obj_type = col_obj_type if col_obj_type == origin_col_type else None

return SnowparkPandasColumn(col_obj, col_obj_type)

def generate_updated_expr_for_new_col(
col_label: Hashable,
) -> SnowparkPandasColumn:
"""
Helper function to generate the newly added column.
Args:
col_label: the label of the new column
Returns:
The new column
The new SnowparkPandasColumn
"""
col_obj_type = None
if item_is_scalar:
new_column = pandas_lit(item)
col_obj_type = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type(
type(item)
)
elif item_column_values:
new_column = item_column_values[item_data_col_label_to_pos_map[col_label]]
col_obj_type = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type(
type(new_column)
)
elif not matching_item_columns_by_label:
offset = item_data_col_offset + item_data_col_label_to_pos_map[col_label]
new_column = col(
result_frame.data_column_snowflake_quoted_identifiers[
item_data_col_offset + item_data_col_label_to_pos_map[col_label]
]
result_frame.data_column_snowflake_quoted_identifiers[offset]
)
col_obj_type = result_frame.cached_data_column_snowpark_pandas_types[offset]
elif (
matching_item_columns_by_label
and col_label in item_data_col_label_to_pos_map
):
offset = item_data_col_offset + item_data_col_label_to_pos_map[col_label]
new_column = col(
result_frame.data_column_snowflake_quoted_identifiers[
item_data_col_offset + item_data_col_label_to_pos_map[col_label]
]
result_frame.data_column_snowflake_quoted_identifiers[offset]
)
col_obj_type = result_frame.cached_data_column_snowpark_pandas_types[offset]
else:
return pandas_lit(None)
return SnowparkPandasColumn(pandas_lit(None))
if index_is_scalar:
new_column = iff(
result_frame_index_col.equal_null(pandas_lit(index)),
Expand All @@ -2543,14 +2564,14 @@ def generate_updated_expr_for_new_col(col_label: Hashable) -> Column:
new_column = iff(index_data_col, new_column, pandas_lit(None))
elif index_is_frame:
new_column = iff(index_data_col.is_null(), pandas_lit(None), new_column)
return new_column
return SnowparkPandasColumn(new_column, col_obj_type)

# The rest of code is to generate the list of project columns and labels for a loc set operation that can involve
# replacing existing column values as well as adding new columns. The caller must provide the callables to select
# the appropriate column replacement or new column logic.
#
# We use project_columns methods to handle all columns in one go
project_labels, project_columns = [], []
project_labels, project_columns, project_column_types = [], [], []

num_data_columns = len(internal_frame.data_column_pandas_labels)
duplicate_data_column_pos_to_count_map = (
Expand All @@ -2573,23 +2594,31 @@ def generate_updated_expr_for_new_col(col_label: Hashable) -> Column:
for col_pos in range(num_data_columns):
col_label = result_frame.data_column_pandas_labels[col_pos]
project_labels.append(col_label)
col_obj = generate_updated_expr_for_existing_col(col_pos)
origin_col_type = result_frame.cached_data_column_snowpark_pandas_types[col_pos]
col_obj, col_type = generate_updated_expr_for_existing_col(
col_pos, origin_col_type
)
project_columns.append(col_obj)
project_column_types.append(col_type)

# When duplicate is needed, pandas will duplicate the column right after the original columns.
if col_pos in duplicate_data_column_pos_to_count_map:
cnt = duplicate_data_column_pos_to_count_map[col_pos]
project_labels += [col_label] * cnt
project_columns += [col_obj] * cnt
project_column_types += [col_type] * cnt

# Last, append new columns
for col_label in new_data_column_pandas_labels_to_append:
new_column = generate_updated_expr_for_new_col(col_label)
new_column, new_column_type = generate_updated_expr_for_new_col(col_label)
project_labels.append(col_label)
project_columns.append(new_column)
project_column_types.append(new_column_type)

return result_frame.project_columns(
pandas_labels=project_labels, column_objects=project_columns
pandas_labels=project_labels,
column_objects=project_columns,
column_types=project_column_types,
)


Expand Down Expand Up @@ -2674,7 +2703,10 @@ def set_frame_2d_positional(
kv_frame = get_kv_frame_from_index_and_item_frames(index, item)
item_data_columns_len = len(item.data_column_snowflake_quoted_identifiers)
else:
kv_frame = index.append_column(ITEM_VALUE_LABEL, pandas_lit(item))
item_type = SnowparkPandasType.get_snowpark_pandas_type_for_pandas_type(
type(item)
)
kv_frame = index.append_column(ITEM_VALUE_LABEL, pandas_lit(item), item_type)
item_data_columns_len = 1

# Next we join the key-value frame with the original frame based on row_position and row key
Expand All @@ -2693,6 +2725,7 @@ def set_frame_2d_positional(

frame = internal_frame.ensure_row_position_column()
kv_frame = kv_frame.ensure_row_position_column()
item_type = kv_frame.cached_data_column_snowpark_pandas_types[-1]

# To match the columns of the original dataframe (see [df] above) and the item values (see [item] above) we
# use the "columns" containing the column position indices. For example, if columns=[0, 2, 3] this would
Expand Down Expand Up @@ -2739,6 +2772,7 @@ def set_frame_2d_positional(

df_kv_frame = df_kv_frame.ensure_row_position_column()

new_data_column_types = []
for col_pos, snowflake_quoted_identifier_pair in enumerate(
zip(
frame.data_column_snowflake_quoted_identifiers,
Expand Down Expand Up @@ -2781,12 +2815,26 @@ def set_frame_2d_positional(
df_snowflake_quoted_identifier,
).as_(new_snowflake_quoted_identifier)
)
if (
frame.snowflake_quoted_identifier_to_snowpark_pandas_type[
original_snowflake_quoted_identifier
]
== item_type
):
new_data_column_types.append(item_type)
else:
new_data_column_types.append(None)
else:
select_list.append(
col(original_snowflake_quoted_identifier).as_(
new_snowflake_quoted_identifier
)
)
new_data_column_types.append(
frame.snowflake_quoted_identifier_to_snowpark_pandas_type[
original_snowflake_quoted_identifier
]
)

select_list.append(df_kv_frame.row_position_snowflake_quoted_identifier)
ordered_dataframe = df_kv_frame.ordered_dataframe.select(select_list)
Expand All @@ -2797,7 +2845,7 @@ def set_frame_2d_positional(
data_column_snowflake_quoted_identifiers=new_data_column_snowflake_quoted_identifiers,
index_column_pandas_labels=frame.index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=frame.index_column_snowflake_quoted_identifiers,
data_column_types=frame.cached_data_column_snowpark_pandas_types,
data_column_types=new_data_column_types,
index_column_types=frame.cached_index_column_snowpark_pandas_types,
)

Expand Down Expand Up @@ -3014,6 +3062,7 @@ def get_item_series_as_single_row_frame(
last_value(col(item.data_column_snowflake_quoted_identifiers[0])).over(
Window.order_by(col(item.row_position_snowflake_quoted_identifier))
),
item.cached_data_column_snowpark_pandas_types[0],
)
last_value_snowflake_quoted_identifier = (
item_frame.data_column_snowflake_quoted_identifiers[-1]
Expand Down
Loading

0 comments on commit f5536b0

Please sign in to comment.