Skip to content

Commit

Permalink
Merge branch 'main' into aalam-SNOW-1418523-make-internal-session-var…
Browse files Browse the repository at this point in the history
…iables-thread-safe
  • Loading branch information
sfc-gh-aalam committed Sep 17, 2024
2 parents f720701 + 6a3a77b commit eca13dc
Show file tree
Hide file tree
Showing 90 changed files with 4,483 additions and 4,413 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,14 @@
#### New Features

- Added support for `TimedeltaIndex.mean` method.
- Added support for some cases of aggregating `Timedelta` columns on `axis=0` with `agg` or `aggregate`.
- Added support for `by`, `left_by`, `right_by`, `left_index`, and `right_index` for `pd.merge_asof`.

#### Bug Fixes

- Fixed a bug where an `Index` object created from a `Series`/`DataFrame` incorrectly updates the `Series`/`DataFrame`'s index name after an inplace update has been applied to the original `Series`/`DataFrame`.
- Suppressed an unhelpful `SettingWithCopyWarning` that sometimes appeared when printing `Timedelta` columns.
- Fixed `inplace` argument for `Series` objects derived from other `Series` objects.

## 1.22.1 (2024-09-11)
This is a re-release of 1.22.0. Please refer to the 1.22.0 release notes for detailed release content.
Expand Down Expand Up @@ -124,6 +131,7 @@ This is a re-release of 1.22.0. Please refer to the 1.22.0 release notes for det
- Added support for `Series.dt.total_seconds` method.
- Added support for `DataFrame.apply(axis=0)`.
- Added support for `Series.dt.tz_convert` and `Series.dt.tz_localize`.
- Added support for `DatetimeIndex.tz_convert` and `DatetimeIndex.tz_localize`.

#### Improvements

Expand Down
4 changes: 2 additions & 2 deletions docs/source/modin/supported/datetime_index_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ Methods
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``snap`` | N | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``tz_convert`` | N | | |
| ``tz_convert`` | Y | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``tz_localize`` | N | | |
| ``tz_localize`` | P | ``ambiguous``, ``nonexistent`` | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``round`` | P | ``ambiguous``, ``nonexistent`` | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down
4 changes: 1 addition & 3 deletions docs/source/modin/supported/general_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ Data manipulations
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``merge`` | P | ``validate`` | ``N`` if param ``validate`` is given |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``merge_asof`` | P | ``by``, ``left_by``, ``right_by``| ``N`` if param ``direction`` is ``nearest``. |
| | | , ``left_index``, ``right_index``| |
| | | , ``suffixes``, ``tolerance`` | |
| ``merge_asof`` | P | ``suffixes``, ``tolerance`` | ``N`` if param ``direction`` is ``nearest`` |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
| ``merge_ordered`` | N | | |
+-----------------------------+---------------------------------+----------------------------------+----------------------------------------------------+
Expand Down
5 changes: 1 addition & 4 deletions src/snowflake/snowpark/_internal/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,10 +956,7 @@ def do_resolve_with_resolved_children(
schema_query = schema_query_for_values_statement(logical_plan.output)

if logical_plan.data:
if (
len(logical_plan.output) * len(logical_plan.data)
< ARRAY_BIND_THRESHOLD
):
if not logical_plan.is_large_local_data:
return self.plan_builder.query(
values_statement(logical_plan.output, logical_plan.data),
logical_plan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#

from typing import AbstractSet, Optional
from typing import AbstractSet, List, Optional

from snowflake.snowpark._internal.analyzer.expression import (
Expression,
derive_dependent_columns,
derive_dependent_columns_with_duplication,
)
from snowflake.snowpark._internal.analyzer.query_plan_analysis_utils import (
PlanNodeCategory,
Expand All @@ -29,6 +30,9 @@ def __str__(self):
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return derive_dependent_columns(self.left, self.right)

def dependent_column_names_with_duplication(self) -> List[str]:
return derive_dependent_columns_with_duplication(self.left, self.right)

@property
def plan_node_category(self) -> PlanNodeCategory:
return PlanNodeCategory.LOW_IMPACT
Expand Down
96 changes: 94 additions & 2 deletions src/snowflake/snowpark/_internal/analyzer/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@
def derive_dependent_columns(
*expressions: "Optional[Expression]",
) -> Optional[AbstractSet[str]]:
"""
Given set of expressions, derive the set of columns that the expressions dependents on.
Note, the returned dependent columns is a set without duplication. For example, given expression
concat(col1, upper(co1), upper(col2)), the result will be {col1, col2} even if col1 has
occurred in the given expression twice.
"""
result = set()
for exp in expressions:
if exp is not None:
Expand All @@ -48,6 +55,23 @@ def derive_dependent_columns(
return result


def derive_dependent_columns_with_duplication(
*expressions: "Optional[Expression]",
) -> List[str]:
"""
Given set of expressions, derive the list of columns that the expression dependents on.
Note, the returned columns will have duplication if the column occurred more than once in
the given expression. For example, concat(col1, upper(co1), upper(col2)) will have result
[col1, col1, col2], where col1 occurred twice in the result.
"""
result = []
for exp in expressions:
if exp is not None:
result.extend(exp.dependent_column_names_with_duplication())
return result


class Expression:
"""Consider removing attributes, and adding properties and methods.
A subclass of Expression may have no child, one child, or multiple children.
Expand All @@ -68,6 +92,9 @@ def dependent_column_names(self) -> Optional[AbstractSet[str]]:
# TODO: consider adding it to __init__ or use cached_property.
return COLUMN_DEPENDENCY_EMPTY

def dependent_column_names_with_duplication(self) -> List[str]:
return []

@property
def pretty_name(self) -> str:
"""Returns a user-facing string representation of this expression's name.
Expand Down Expand Up @@ -143,6 +170,9 @@ def __init__(self, plan: "SnowflakePlan") -> None:
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return COLUMN_DEPENDENCY_DOLLAR

def dependent_column_names_with_duplication(self) -> List[str]:
return list(COLUMN_DEPENDENCY_DOLLAR)

@property
def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]:
return self.plan.cumulative_node_complexity
Expand All @@ -156,6 +186,9 @@ def __init__(self, expressions: List[Expression]) -> None:
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return derive_dependent_columns(*self.expressions)

def dependent_column_names_with_duplication(self) -> List[str]:
return derive_dependent_columns_with_duplication(*self.expressions)

@property
def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]:
return sum_node_complexities(
Expand All @@ -172,6 +205,9 @@ def __init__(self, columns: Expression, values: List[Expression]) -> None:
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return derive_dependent_columns(self.columns, *self.values)

def dependent_column_names_with_duplication(self) -> List[str]:
return derive_dependent_columns_with_duplication(self.columns, *self.values)

@property
def plan_node_category(self) -> PlanNodeCategory:
return PlanNodeCategory.IN
Expand Down Expand Up @@ -212,6 +248,9 @@ def __str__(self):
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return {self.name}

def dependent_column_names_with_duplication(self) -> List[str]:
return [self.name]

@property
def plan_node_category(self) -> PlanNodeCategory:
return PlanNodeCategory.COLUMN
Expand All @@ -235,6 +274,13 @@ def dependent_column_names(self) -> Optional[AbstractSet[str]]:
else COLUMN_DEPENDENCY_ALL
)

def dependent_column_names_with_duplication(self) -> List[str]:
return (
derive_dependent_columns_with_duplication(*self.expressions)
if self.expressions
else [] # we currently do not handle * dependency
)

@property
def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]:
complexity = {} if self.expressions else {PlanNodeCategory.COLUMN: 1}
Expand Down Expand Up @@ -278,6 +324,14 @@ def __hash__(self):
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return self._dependent_column_names

def dependent_column_names_with_duplication(self) -> List[str]:
return (
[]
if (self._dependent_column_names == COLUMN_DEPENDENCY_ALL)
or (self._dependent_column_names is None)
else list(self._dependent_column_names)
)

@property
def plan_node_category(self) -> PlanNodeCategory:
return PlanNodeCategory.COLUMN
Expand Down Expand Up @@ -371,6 +425,9 @@ def __init__(self, expr: Expression, pattern: Expression) -> None:
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return derive_dependent_columns(self.expr, self.pattern)

def dependent_column_names_with_duplication(self) -> List[str]:
return derive_dependent_columns_with_duplication(self.expr, self.pattern)

@property
def plan_node_category(self) -> PlanNodeCategory:
# expr LIKE pattern
Expand Down Expand Up @@ -400,6 +457,9 @@ def __init__(
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return derive_dependent_columns(self.expr, self.pattern)

def dependent_column_names_with_duplication(self) -> List[str]:
return derive_dependent_columns_with_duplication(self.expr, self.pattern)

@property
def plan_node_category(self) -> PlanNodeCategory:
# expr REG_EXP pattern
Expand All @@ -423,6 +483,9 @@ def __init__(self, expr: Expression, collation_spec: str) -> None:
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return derive_dependent_columns(self.expr)

def dependent_column_names_with_duplication(self) -> List[str]:
return derive_dependent_columns_with_duplication(self.expr)

@property
def plan_node_category(self) -> PlanNodeCategory:
# expr COLLATE collate_spec
Expand All @@ -444,6 +507,9 @@ def __init__(self, expr: Expression, field: str) -> None:
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return derive_dependent_columns(self.expr)

def dependent_column_names_with_duplication(self) -> List[str]:
return derive_dependent_columns_with_duplication(self.expr)

@property
def plan_node_category(self) -> PlanNodeCategory:
# the literal corresponds to the contribution from self.field
Expand All @@ -466,6 +532,9 @@ def __init__(self, expr: Expression, field: int) -> None:
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return derive_dependent_columns(self.expr)

def dependent_column_names_with_duplication(self) -> List[str]:
return derive_dependent_columns_with_duplication(self.expr)

@property
def plan_node_category(self) -> PlanNodeCategory:
# the literal corresponds to the contribution from self.field
Expand Down Expand Up @@ -510,6 +579,9 @@ def sql(self) -> str:
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return derive_dependent_columns(*self.children)

def dependent_column_names_with_duplication(self) -> List[str]:
return derive_dependent_columns_with_duplication(*self.children)

@property
def plan_node_category(self) -> PlanNodeCategory:
return PlanNodeCategory.FUNCTION
Expand All @@ -525,6 +597,9 @@ def __init__(self, expr: Expression, order_by_cols: List[Expression]) -> None:
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return derive_dependent_columns(self.expr, *self.order_by_cols)

def dependent_column_names_with_duplication(self) -> List[str]:
return derive_dependent_columns_with_duplication(self.expr, *self.order_by_cols)

@property
def plan_node_category(self) -> PlanNodeCategory:
# expr WITHIN GROUP (ORDER BY cols)
Expand All @@ -549,13 +624,21 @@ def __init__(
self.branches = branches
self.else_value = else_value

def dependent_column_names(self) -> Optional[AbstractSet[str]]:
@property
def _child_expressions(self) -> List[Expression]:
exps = []
for exp_tuple in self.branches:
exps.extend(exp_tuple)
if self.else_value is not None:
exps.append(self.else_value)
return derive_dependent_columns(*exps)

return exps

def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return derive_dependent_columns(*self._child_expressions)

def dependent_column_names_with_duplication(self) -> List[str]:
return derive_dependent_columns_with_duplication(*self._child_expressions)

@property
def plan_node_category(self) -> PlanNodeCategory:
Expand Down Expand Up @@ -602,6 +685,9 @@ def __init__(
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return derive_dependent_columns(*self.children)

def dependent_column_names_with_duplication(self) -> List[str]:
return derive_dependent_columns_with_duplication(*self.children)

@property
def plan_node_category(self) -> PlanNodeCategory:
return PlanNodeCategory.FUNCTION
Expand All @@ -617,6 +703,9 @@ def __init__(self, col: Expression, delimiter: str, is_distinct: bool) -> None:
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return derive_dependent_columns(self.col)

def dependent_column_names_with_duplication(self) -> List[str]:
return derive_dependent_columns_with_duplication(self.col)

@property
def plan_node_category(self) -> PlanNodeCategory:
return PlanNodeCategory.FUNCTION
Expand All @@ -636,6 +725,9 @@ def __init__(self, exprs: List[Expression]) -> None:
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return derive_dependent_columns(*self.exprs)

def dependent_column_names_with_duplication(self) -> List[str]:
return derive_dependent_columns_with_duplication(*self.exprs)

@property
def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]:
return sum_node_complexities(
Expand Down
8 changes: 8 additions & 0 deletions src/snowflake/snowpark/_internal/analyzer/grouping_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from snowflake.snowpark._internal.analyzer.expression import (
Expression,
derive_dependent_columns,
derive_dependent_columns_with_duplication,
)
from snowflake.snowpark._internal.analyzer.query_plan_analysis_utils import (
PlanNodeCategory,
Expand All @@ -23,6 +24,9 @@ def __init__(self, group_by_exprs: List[Expression]) -> None:
def dependent_column_names(self) -> Optional[AbstractSet[str]]:
return derive_dependent_columns(*self.group_by_exprs)

def dependent_column_names_with_duplication(self) -> List[str]:
return derive_dependent_columns_with_duplication(*self.group_by_exprs)

@property
def plan_node_category(self) -> PlanNodeCategory:
return PlanNodeCategory.LOW_IMPACT
Expand All @@ -45,6 +49,10 @@ def dependent_column_names(self) -> Optional[AbstractSet[str]]:
flattened_args = [exp for sublist in self.args for exp in sublist]
return derive_dependent_columns(*flattened_args)

def dependent_column_names_with_duplication(self) -> List[str]:
flattened_args = [exp for sublist in self.args for exp in sublist]
return derive_dependent_columns_with_duplication(*flattened_args)

@property
def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]:
return sum_node_complexities(
Expand Down
19 changes: 18 additions & 1 deletion src/snowflake/snowpark/_internal/analyzer/snowflake_plan_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,27 @@ def __init__(
self.data = data
self.schema_query = schema_query

@property
def is_large_local_data(self) -> bool:
from snowflake.snowpark._internal.analyzer.analyzer import ARRAY_BIND_THRESHOLD

return len(self.data) * len(self.output) >= ARRAY_BIND_THRESHOLD

@property
def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]:
if self.is_large_local_data:
# When the number of literals exceeds the threshold, we generate 3 queries:
# 1. create table query
# 2. insert into table query
# 3. select * from table query
# We only consider the complexity from the final select * query since other queries
# are built based on it.
return {
PlanNodeCategory.COLUMN: 1,
}

# If we stay under the threshold, we generate a single query:
# select $1, ..., $m FROM VALUES (r11, r12, ..., r1m), (rn1, ...., rnm)
# TODO: use ARRAY_BIND_THRESHOLD
return {
PlanNodeCategory.COLUMN: len(self.output),
PlanNodeCategory.LITERAL: len(self.data) * len(self.output),
Expand Down
Loading

0 comments on commit eca13dc

Please sign in to comment.