Skip to content

Commit

Permalink
SNOW-1637932 Refactor quoted_identifier_to_snowflake_type (#2159)
Browse files Browse the repository at this point in the history
<!---
Please answer these questions before creating your pull request. Thanks!
--->

1. Which Jira issue is this PR addressing? Make sure that there is an
accompanying issue to your PR.

   <!---
   In this section, please add a Snowflake Jira issue number.
   
Note that if a corresponding GitHub issue exists, you should still
include
   the Snowflake Jira issue number. For example, for GitHub issue
#1400, you should
   add "SNOW-1335071" here.
    --->

   Fixes SNOW-1637932

2. Fill out the following pre-review checklist:

- [ ] I am adding a new automated test(s) to verify correctness of my
new code
- [ ] If this test skips Local Testing mode, I'm requesting review from
@snowflakedb/local-testing
   - [ ] I am adding new logging messages
   - [ ] I am adding a new telemetry message
   - [ ] I am adding new credentials
   - [ ] I am adding a new dependency
- [ ] If this is a new feature/behavior, I'm adding the Local Testing
parity changes.

3. Please describe how your code solves the related issue.

Please write a short description of how your code change solves the
related issue.

The main purpose for this refactoring is to avoid calling schema when
cached type is available, e.g., avoid calling schema when we know the
type is Timedelta in binary ops.

This pull request includes several changes to the Snowflake Snowpark
Modin plugin, focusing on refactoring the way Snowflake types are
retrieved and used. The main changes involve replacing the
`quoted_identifier_to_snowflake_type` method with a new
`get_snowflake_type` method, which simplifies type retrieval by allowing
it to accept a single identifier or a list of identifiers.

### Refactoring Type Retrieval

*
[`src/snowflake/snowpark/modin/plugin/_internal/aggregation_utils.py`](diffhunk://#diff-036a8cce05771914c03d260ad7fb1ab74a1578b353ff5156a65fbe546788872cL1044-L1047):
Updated `generate_column_agg_info` to use `get_snowflake_type` instead
of `quoted_identifier_to_snowflake_type` for retrieving Snowflake types.
[[1]](diffhunk://#diff-036a8cce05771914c03d260ad7fb1ab74a1578b353ff5156a65fbe546788872cL1044-L1047)
[[2]](diffhunk://#diff-036a8cce05771914c03d260ad7fb1ab74a1578b353ff5156a65fbe546788872cR1067-R1070)
[[3]](diffhunk://#diff-036a8cce05771914c03d260ad7fb1ab74a1578b353ff5156a65fbe546788872cL1109-R1109)

*
[`src/snowflake/snowpark/modin/plugin/_internal/binary_op_utils.py`](diffhunk://#diff-dd6dcb779b1e636fa0bc9541f9c0f8f0e18227367ef40a779146c5a6108676ebL631-R631):
Modified `prepare_binop_pairs_between_dataframe_and_dataframe` to use
`get_snowflake_type` for type mapping.
[[1]](diffhunk://#diff-dd6dcb779b1e636fa0bc9541f9c0f8f0e18227367ef40a779146c5a6108676ebL631-R631)
[[2]](diffhunk://#diff-dd6dcb779b1e636fa0bc9541f9c0f8f0e18227367ef40a779146c5a6108676ebL649-R649)
[[3]](diffhunk://#diff-dd6dcb779b1e636fa0bc9541f9c0f8f0e18227367ef40a779146c5a6108676ebL671-R671)

*
[`src/snowflake/snowpark/modin/plugin/_internal/frame.py`](diffhunk://#diff-dc59d6fb5be73824e72c1e84ca671739e68c28f651a164e96af7f19a3f732edeL360-R417):
Added `get_snowflake_type` method and updated existing methods to use
it.
[[1]](diffhunk://#diff-dc59d6fb5be73824e72c1e84ca671739e68c28f651a164e96af7f19a3f732edeL360-R417)
[[2]](diffhunk://#diff-dc59d6fb5be73824e72c1e84ca671739e68c28f651a164e96af7f19a3f732edeL518-R564)

### Updating Indexing Utilities

*
[`src/snowflake/snowpark/modin/plugin/_internal/indexing_utils.py`](diffhunk://#diff-524607b71f519819352dae1467474661e35164db39180ad106e30e7bf2e3265eL386-R388):
Replaced `quoted_identifier_to_snowflake_type` with `get_snowflake_type`
in multiple functions for checking data types.
[[1]](diffhunk://#diff-524607b71f519819352dae1467474661e35164db39180ad106e30e7bf2e3265eL386-R388)
[[2]](diffhunk://#diff-524607b71f519819352dae1467474661e35164db39180ad106e30e7bf2e3265eL1241-R1243)
[[3]](diffhunk://#diff-524607b71f519819352dae1467474661e35164db39180ad106e30e7bf2e3265eL1655-R1655)
[[4]](diffhunk://#diff-524607b71f519819352dae1467474661e35164db39180ad106e30e7bf2e3265eL1742-R1743)
[[5]](diffhunk://#diff-524607b71f519819352dae1467474661e35164db39180ad106e30e7bf2e3265eL2682-R2682)

### Enhancing Join and Where Utilities

*
[`src/snowflake/snowpark/modin/plugin/_internal/join_utils.py`](diffhunk://#diff-67e1df8ec1e45b14cf51e35c6f67ac04982e41f85580cdfff391e35e025546d0L1070-R1071):
Updated `convert_incompatible_types_to_variant` to use
`get_snowflake_type` for type mapping.

*
[`src/snowflake/snowpark/modin/plugin/_internal/where_utils.py`](diffhunk://#diff-ddf55ef822ec0c6f0e5406f498175dce3ae3a2177c036884d7356961d5b15015L19-R26):
Refactored `validate_expected_boolean_data_columns` to use
`get_snowflake_type` for type validation.

### Compiler Adjustments

*
[`src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py`](diffhunk://#diff-834ee069919510e7e410c503a8afa455154c40e65389769c08d35b0ec3f8ec03L473-R477):
Refactored several methods to use `get_snowflake_type` for type
retrieval and mapping.
[[1]](diffhunk://#diff-834ee069919510e7e410c503a8afa455154c40e65389769c08d35b0ec3f8ec03L473-R477)
[[2]](diffhunk://#diff-834ee069919510e7e410c503a8afa455154c40e65389769c08d35b0ec3f8ec03L497-R502)
[[3]](diffhunk://#diff-834ee069919510e7e410c503a8afa455154c40e65389769c08d35b0ec3f8ec03L1500)
[[4]](diffhunk://#diff-834ee069919510e7e410c503a8afa455154c40e65389769c08d35b0ec3f8ec03L1545-R1546)
[[5]](diffhunk://#diff-834ee069919510e7e410c503a8afa455154c40e65389769c08d35b0ec3f8ec03L1710-R1711)
[[6]](diffhunk://#diff-834ee069919510e7e410c503a8afa455154c40e65389769c08d35b0ec3f8ec03L1870-R1877)
[[7]](diffhunk://#diff-834ee069919510e7e410c503a8afa455154c40e65389769c08d35b0ec3f8ec03L1924)
  • Loading branch information
sfc-gh-azhan authored Aug 26, 2024
1 parent f643840 commit fc09a7a
Showing 14 changed files with 160 additions and 307 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -63,6 +63,10 @@
- Added support for `Index.is_boolean`, `Index.is_integer`, `Index.is_floating`, `Index.is_numeric`, and `Index.is_object`.
- Added support for `DatetimeIndex.round`, `DatetimeIndex.floor` and `DatetimeIndex.ceil`.

#### Improvements

- Refactored `quoted_identifier_to_snowflake_type` to avoid making metadata queries if the types have been cached locally.

#### Bug Fixes

- Stopped ignoring nanoseconds in `pd.Timedelta` scalars.
Original file line number Diff line number Diff line change
@@ -1041,10 +1041,6 @@ def generate_column_agg_info(
List[Hashable]
The new index data column index names for the dataframe after aggregation
"""

quoted_identifier_to_snowflake_type: dict[
str, DataType
] = internal_frame.quoted_identifier_to_snowflake_type()
num_levels: int = internal_frame.num_index_levels(axis=1)
# reserve all index column name and ordering column names
identifiers_to_exclude: list[str] = (
@@ -1068,6 +1064,10 @@ def generate_column_agg_info(
not agg_func_level_included or not include_agg_func_only_in_result_label
)

identifier_to_snowflake_type = internal_frame.quoted_identifier_to_snowflake_type(
[pair.snowflake_quoted_identifier for pair in column_to_agg_func.keys()]
)

for pandas_label_to_identifier, agg_func in column_to_agg_func.items():
pandas_label, quoted_identifier = pandas_label_to_identifier
agg_func_list = (
@@ -1106,7 +1106,7 @@ def generate_column_agg_info(
column_agg_ops.append(
AggregateColumnOpParameters(
snowflake_quoted_identifier=agg_func_col,
data_type=quoted_identifier_to_snowflake_type[quoted_identifier],
data_type=identifier_to_snowflake_type[quoted_identifier],
agg_pandas_label=label,
agg_snowflake_quoted_identifier=identifier,
snowflake_agg_func=snowflake_agg_func,
Original file line number Diff line number Diff line change
@@ -628,7 +628,7 @@ def prepare_binop_pairs_between_dataframe_and_dataframe(
List of BinaryOperationPair.
"""
# construct list of pairs which label belongs to which quoted identifier
type_map = aligned_rhs_and_lhs.result_frame.quoted_identifier_to_snowflake_type()
type_map = aligned_rhs_and_lhs.result_frame.get_snowflake_type
left_right_pairs = []
for label in combined_data_labels:
left_identifier, right_identifier = None, None
@@ -646,7 +646,7 @@ def prepare_binop_pairs_between_dataframe_and_dataframe(
left = col(left_identifier)
# To avoid referencing always the last right_identifier in the loop, use functools.partial
left_typer = functools.partial(
lambda identifier: type_map[identifier], left_identifier
lambda identifier: type_map(identifier), left_identifier
) # noqa: E731
except ValueError:
# lhs label not in list.
@@ -668,7 +668,7 @@ def prepare_binop_pairs_between_dataframe_and_dataframe(
right = col(right_identifier)
# To avoid referencing always the last right_identifier in the loop, use functools.partial
right_typer = functools.partial(
lambda identifier: type_map[identifier], right_identifier
lambda identifier: type_map(identifier), right_identifier
) # noqa: E731
except ValueError:
# rhs label not in list
68 changes: 56 additions & 12 deletions src/snowflake/snowpark/modin/plugin/_internal/frame.py
Original file line number Diff line number Diff line change
@@ -357,18 +357,64 @@ def data_column_snowflake_quoted_identifiers(self) -> list[str]:
]
]

def quoted_identifier_to_snowflake_type(self) -> dict[str, DataType]:
identifier_to_type = {}
for f in self.ordered_dataframe.schema.fields:
def get_snowflake_type(
self, identifier: Union[str, list[str]]
) -> Union[DataType, list[DataType]]:
"""
Get the Snowflake type.
Args:
identifier: one or a list of Snowflake quoted identifiers
Returns:
The one or a list of Snowflake types.
"""
if isinstance(identifier, list):
return list(self.quoted_identifier_to_snowflake_type(identifier).values())
return list(self.quoted_identifier_to_snowflake_type([identifier]).values())[0]

def quoted_identifier_to_snowflake_type(
self, identifiers: Optional[list[str]] = None
) -> dict[str, DataType]:
"""
Get a map from Snowflake quoted identifier to Snowflake types.
Args:
identifiers: if identifiers is given, only return the mapping for those inputs. Otherwise, the map will
include all identifiers in the frame.
Return:
A mapping from Snowflake quoted identifier to Snowflake types.
"""
snowpark_pandas_type_mapping = (
self.snowflake_quoted_identifier_to_snowpark_pandas_type
)
if identifiers is not None:
# ordered dataframe may include columns that are not index or data
# columns of this InternalFrame, so don't assume that each
# identifier is in snowflake_quoted_identifier_to_snowflake_type.
cached_type = self.snowflake_quoted_identifier_to_snowpark_pandas_type.get(
f.column_identifier.quoted_name, None
)
identifier_to_type[f.column_identifier.quoted_name] = (
cached_type if cached_type is not None else f.datatype
)
cached_types = {
id: snowpark_pandas_type_mapping.get(id, None) for id in identifiers
}
if None not in cached_types.values():
# if all types are cached, then we don't need to call schema
return cached_types

all_identifier_to_type = {}

for f in self.ordered_dataframe.schema.fields:
id = f.column_identifier.quoted_name
cached_type = snowpark_pandas_type_mapping.get(id, None)
all_identifier_to_type[id] = cached_type or f.datatype

if identifiers is not None:
# Python dict's keys and values are iterated over in insertion order. This make sense result dict
# `identifier_to_type`'s order matches with the input `identifier`
identifier_to_type = {id: all_identifier_to_type[id] for id in identifiers}
else:
identifier_to_type = all_identifier_to_type

return identifier_to_type

@property
@@ -515,9 +561,7 @@ def index_columns_pandas_index(self, **kwargs: Any) -> native_pd.Index:
else:
# We have one index column. Fill in the type correctly.
index_identifier = self.index_column_snowflake_quoted_identifiers[0]
index_type = TypeMapper.to_pandas(
self.quoted_identifier_to_snowflake_type()[index_identifier]
)
index_type = TypeMapper.to_pandas(self.get_snowflake_type(index_identifier))
ret = native_pd.Index(
[row[0] for row in index_values],
name=self.index_column_pandas_labels[0],
102 changes: 7 additions & 95 deletions src/snowflake/snowpark/modin/plugin/_internal/indexing_utils.py
Original file line number Diff line number Diff line change
@@ -52,7 +52,6 @@
)
from snowflake.snowpark.modin.plugin._internal.type_utils import (
NUMERIC_SNOWFLAKE_TYPES_TUPLE,
is_numeric_snowpark_type,
)
from snowflake.snowpark.modin.plugin._internal.utils import (
DEFAULT_DATA_COLUMN_LABEL,
@@ -383,9 +382,9 @@ def get_frame_by_row_pos_frame(
The selected frame
"""
# check data type
key_datatype = key.quoted_identifier_to_snowflake_type()[
key_datatype = key.get_snowflake_type(
key.data_column_snowflake_quoted_identifiers[0]
]
)
# implicitly allow float types to be compatible with pandas
assert isinstance(
key_datatype, NUMERIC_SNOWFLAKE_TYPES_TUPLE
@@ -1238,9 +1237,9 @@ def get_frame_by_row_label(
), f"frontend should convert key to the supported types but got {type(key)}"

# check data type
key_datatype = key.quoted_identifier_to_snowflake_type()[
key_datatype = key.get_snowflake_type(
key.data_column_snowflake_quoted_identifiers[0]
]
)

# boolean indexer
if isinstance(key_datatype, BooleanType):
@@ -1652,9 +1651,7 @@ def _get_frame_by_row_label_non_boolean_frame(
# Otherwise, when key value is not array type, loc does prefix match, i.e., match the top level only
# e.g., if the internal frame has multiindex ["foo", "bar"]
if isinstance(
key.quoted_identifier_to_snowflake_type()[
key.data_column_snowflake_quoted_identifiers[0]
],
key.get_snowflake_type(key.data_column_snowflake_quoted_identifiers[0]),
ArrayType,
):
# if the key is array type, pandas performs exact match, so the value in the array needs to be exact
@@ -1705,91 +1702,6 @@ def _get_frame_by_row_label_non_boolean_frame(
)


def _get_frame_by_row_series_bool(
internal_frame: InternalFrame,
key: InternalFrame,
) -> InternalFrame:
"""
Helper function for `get_frame_2d_by_label_and_positional` by row with Series[bool] input.
key will be reindexed to match DataFrame index.
Return an InternalFrame with rows from `internal_frame` whose index are indexes in the boolean mask with True value.
Parameters
----------
internal_frame: the InternalFrame of the series calling loc
key: InternalFrame of Series[bool] input.
Returns
-------
Result InternalFrame from loc by row with Series boolean mask.
"""
# TODO: SNOW-884220 support Series[bool] with multiindex
# we only support single index for now.
key_index_identifier = key.index_column_snowflake_quoted_identifiers[0]

# validate no duplicate index in key
if not key.has_unique_index(axis=0):
raise UNALIGNABLE_INDEXING_ERROR
# raise error unless both are numeric or have the same type, otherwise isin will do Implicit Casting (“Coercion”)
# e.g. if key's index is Index([1, 2, 3]) and df's index is Index(["1", "2", "3"]) they do not match in pandas
# TODO SNOW-878592: if key has DateTimeIndex, and df has str index, it should be valid and supported.
# No need to support the other way around.
# e.g. `df = pd.DataFrame({'val': range(3)}, index=['2023-01-01', '2023-01-02', '2023-01-03', ])
# bool_series = pd.Series([False, False, True, ],index=pd.date_range('2023-01-01', periods=3, freq='D'))
# `df.loc[bool_series]` should return a dataframe with the third row from df
key_index_type = key.quoted_identifier_to_snowflake_type()[key_index_identifier]
df_index_type = internal_frame.quoted_identifier_to_snowflake_type()[
internal_frame.index_column_snowflake_quoted_identifiers[0]
]
if (
not (
is_numeric_snowpark_type(key_index_type)
and is_numeric_snowpark_type(df_index_type)
)
and df_index_type != key_index_type
):
raise UNALIGNABLE_INDEXING_ERROR

new_key = InternalFrame.create(
# filter key based on boolean mask
ordered_dataframe=key.ordered_dataframe.filter(
key.data_column_snowflake_quoted_identifiers[0]
),
data_column_pandas_labels=key.data_column_pandas_labels,
data_column_pandas_index_names=key.data_column_pandas_index_names,
data_column_snowflake_quoted_identifiers=key.data_column_snowflake_quoted_identifiers,
index_column_pandas_labels=key.index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=key.index_column_snowflake_quoted_identifiers,
data_column_types=key.cached_data_column_snowpark_pandas_types,
index_column_types=key.cached_index_column_snowpark_pandas_types,
)

joined_frame, result_column_mapper = join(
left=new_key,
right=internal_frame,
left_on=key.index_column_snowflake_quoted_identifiers,
right_on=internal_frame.index_column_snowflake_quoted_identifiers,
how="inner",
inherit_join_index=InheritJoinIndex.FROM_RIGHT,
)
return InternalFrame.create(
ordered_dataframe=joined_frame.ordered_dataframe,
data_column_pandas_labels=internal_frame.data_column_pandas_labels,
data_column_pandas_index_names=internal_frame.data_column_pandas_index_names,
data_column_snowflake_quoted_identifiers=result_column_mapper.map_right_quoted_identifiers(
internal_frame.data_column_snowflake_quoted_identifiers
),
index_column_pandas_labels=internal_frame.index_column_pandas_labels,
index_column_snowflake_quoted_identifiers=result_column_mapper.map_right_quoted_identifiers(
internal_frame.index_column_snowflake_quoted_identifiers
),
data_column_types=internal_frame.cached_data_column_snowpark_pandas_types,
index_column_types=internal_frame.cached_index_column_snowpark_pandas_types,
)


def _propagate_last_row_if_columns_are_short(
frame: InternalFrame,
columns_to_ffill: list[str],
@@ -2679,9 +2591,9 @@ def set_frame_2d_positional(
-------
The result is a frame that has the indexed row and columns replaced with item values.
"""
index_data_type = index.quoted_identifier_to_snowflake_type()[
index_data_type = index.get_snowflake_type(
index.data_column_snowflake_quoted_identifiers[0]
]
)

# If index is a bool_indexer then convert to same-sized position index, False values will be null.
if isinstance(index_data_type, BooleanType):
4 changes: 2 additions & 2 deletions src/snowflake/snowpark/modin/plugin/_internal/join_utils.py
Original file line number Diff line number Diff line change
@@ -1071,8 +1071,8 @@ def convert_incompatible_types_to_variant(
right_ids
), f"ids len mismatch {len(left_ids)} vs. {len(right_ids)}"

left_id_to_type_map = left.quoted_identifier_to_snowflake_type()
right_id_to_type_map = right.quoted_identifier_to_snowflake_type()
left_id_to_type_map = left.quoted_identifier_to_snowflake_type(left_ids)
right_id_to_type_map = right.quoted_identifier_to_snowflake_type(right_ids)

left_to_variant = {}
right_to_variant = {}
Original file line number Diff line number Diff line change
@@ -292,7 +292,7 @@ def get_snowflake_quoted_identifier_for_resample_index_col(frame: InternalFrame)
)

index_col = index_cols[0]
sf_type = frame.quoted_identifier_to_snowflake_type()[index_col]
sf_type = frame.get_snowflake_type(index_col)

if not isinstance(sf_type, (TimestampType, DateType)):
raise TypeError("Only valid with DatetimeIndex.")
10 changes: 5 additions & 5 deletions src/snowflake/snowpark/modin/plugin/_internal/unpivot_utils.py
Original file line number Diff line number Diff line change
@@ -323,11 +323,11 @@ def _prepare_unpivot_internal(

# If the original frame had *all* the same data types, then we can preserve this here otherwise
# we need to default to variant.
frame_data_type_map = original_frame.quoted_identifier_to_snowflake_type()
original_data_types = {
frame_data_type_map.get(snowflake_quoted_identifier)
for snowflake_quoted_identifier in original_frame.data_column_snowflake_quoted_identifiers
}
original_data_types = set(
original_frame.get_snowflake_type(
original_frame.data_column_snowflake_quoted_identifiers
)
)
output_data_type = (
original_data_types.pop() if len(original_data_types) == 1 else VariantType()
)
12 changes: 4 additions & 8 deletions src/snowflake/snowpark/modin/plugin/_internal/where_utils.py
Original file line number Diff line number Diff line change
@@ -16,17 +16,13 @@ def validate_expected_boolean_data_columns(frame: InternalFrame) -> None:
Returns:
None
"""
frame_snowflake_identifier_to_data_type_map = (
frame.quoted_identifier_to_snowflake_type()
)

if not all(
isinstance(
frame_snowflake_identifier_to_data_type_map.get(
snowflake_quoted_identifier
),
t,
BooleanType,
)
for snowflake_quoted_identifier in frame.data_column_snowflake_quoted_identifiers
for t in frame.get_snowflake_type(
frame.data_column_snowflake_quoted_identifiers
)
):
raise ValueError("Boolean array expected for the condition, not object")
Loading

0 comments on commit fc09a7a

Please sign in to comment.