Skip to content

Commit

Permalink
Merge branch 'main' into mvashishtha/SNOW-1654730/refactor-aggregatio…
Browse files Browse the repository at this point in the history
…n-utils
  • Loading branch information
sfc-gh-mvashishtha authored Sep 11, 2024
2 parents b44a816 + fe51d4d commit 966e5fa
Show file tree
Hide file tree
Showing 26 changed files with 1,334 additions and 634 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

## 1.23.0 (TBD)

### Snowpark pandas API Updates

#### Improvements

- Improved `to_pandas` to persist the original timezone offset for TIMESTAMP_TZ type.

#### New Features

- Added support for `TimedeltaIndex.mean` method.

## 1.22.0 (2024-09-10)

Expand Down Expand Up @@ -109,6 +118,7 @@
- Added support for string indexing with `Timedelta` objects.
- Added support for `Series.dt.total_seconds` method.
- Added support for `DataFrame.apply(axis=0)`.
- Added support for `Series.dt.tz_convert` and `Series.dt.tz_localize`.

#### Improvements

Expand Down
2 changes: 2 additions & 0 deletions docs/source/modin/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ Series
Series.dt.seconds
Series.dt.microseconds
Series.dt.nanoseconds
Series.dt.tz_convert
Series.dt.tz_localize


.. rubric:: String accessor methods
Expand Down
5 changes: 3 additions & 2 deletions docs/source/modin/supported/series_dt_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ the method in the left column.
+-----------------------------+---------------------------------+----------------------------------------------------+
| ``to_pydatetime`` | N | |
+-----------------------------+---------------------------------+----------------------------------------------------+
| ``tz_localize`` | N | |
| ``tz_localize`` | P | ``N`` if `ambiguous` or `nonexistent` are set to a |
| | | non-default value. |
+-----------------------------+---------------------------------+----------------------------------------------------+
| ``tz_convert`` | N | |
| ``tz_convert`` | Y | |
+-----------------------------+---------------------------------+----------------------------------------------------+
| ``normalize`` | Y | |
+-----------------------------+---------------------------------+----------------------------------------------------+
Expand Down
2 changes: 1 addition & 1 deletion docs/source/modin/supported/timedelta_index_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Methods
+-----------------------------+---------------------------------+----------------------------------+-------------------------------------------+
| ``ceil`` | Y | | |
+-----------------------------+---------------------------------+----------------------------------+-------------------------------------------+
| ``mean`` | N | | |
| ``mean`` | Y | | |
+-----------------------------+---------------------------------+----------------------------------+-------------------------------------------+
| ``total_seconds`` | Y | | |
+-----------------------------+---------------------------------+----------------------------------+-------------------------------------------+
66 changes: 63 additions & 3 deletions src/snowflake/snowpark/_internal/compiler/plan_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,31 @@
#

import copy
import time
from typing import Dict, List

from snowflake.snowpark._internal.analyzer.query_plan_analysis_utils import (
get_complexity_score,
)
from snowflake.snowpark._internal.analyzer.snowflake_plan import (
PlanQueryType,
Query,
SnowflakePlan,
)
from snowflake.snowpark._internal.analyzer.snowflake_plan_node import LogicalPlan
from snowflake.snowpark._internal.compiler.large_query_breakdown import (
COMPLEXITY_SCORE_LOWER_BOUND,
COMPLEXITY_SCORE_UPPER_BOUND,
LargeQueryBreakdown,
)
from snowflake.snowpark._internal.compiler.repeated_subquery_elimination import (
RepeatedSubqueryElimination,
)
from snowflake.snowpark._internal.compiler.telemetry_constants import (
CompilationStageTelemetryField,
)
from snowflake.snowpark._internal.compiler.utils import create_query_generator
from snowflake.snowpark._internal.telemetry import TelemetryField
from snowflake.snowpark.mock._connection import MockServerConnection


Expand Down Expand Up @@ -68,24 +78,74 @@ def compile(self) -> Dict[PlanQueryType, List[Query]]:
if self.should_start_query_compilation():
# preparation for compilation
# 1. make a copy of the original plan
start_time = time.time()
complexity_score_before_compilation = get_complexity_score(
self._plan.cumulative_node_complexity
)
logical_plans: List[LogicalPlan] = [copy.deepcopy(self._plan)]
deep_copy_end_time = time.time()

# 2. create a code generator with the original plan
query_generator = create_query_generator(self._plan)

# apply each optimizations if needed
# 3. apply each optimizations if needed
# CTE optimization
cte_start_time = time.time()
if self._plan.session.cte_optimization_enabled:
repeated_subquery_eliminator = RepeatedSubqueryElimination(
logical_plans, query_generator
)
logical_plans = repeated_subquery_eliminator.apply()

cte_end_time = time.time()
complexity_scores_after_cte = [
get_complexity_score(logical_plan.cumulative_node_complexity)
for logical_plan in logical_plans
]

# Large query breakdown
if self._plan.session.large_query_breakdown_enabled:
large_query_breakdown = LargeQueryBreakdown(
self._plan.session, query_generator, logical_plans
)
logical_plans = large_query_breakdown.apply()

# do a final pass of code generation
return query_generator.generate_queries(logical_plans)
large_query_breakdown_end_time = time.time()
complexity_scores_after_large_query_breakdown = [
get_complexity_score(logical_plan.cumulative_node_complexity)
for logical_plan in logical_plans
]

# 4. do a final pass of code generation
queries = query_generator.generate_queries(logical_plans)

# log telemetry data
deep_copy_time = deep_copy_end_time - start_time
cte_time = cte_end_time - cte_start_time
large_query_breakdown_time = large_query_breakdown_end_time - cte_end_time
total_time = time.time() - start_time
session = self._plan.session
summary_value = {
TelemetryField.CTE_OPTIMIZATION_ENABLED.value: session.cte_optimization_enabled,
TelemetryField.LARGE_QUERY_BREAKDOWN_ENABLED.value: session.large_query_breakdown_enabled,
CompilationStageTelemetryField.COMPLEXITY_SCORE_BOUNDS.value: (
COMPLEXITY_SCORE_LOWER_BOUND,
COMPLEXITY_SCORE_UPPER_BOUND,
),
CompilationStageTelemetryField.TIME_TAKEN_FOR_COMPILATION.value: total_time,
CompilationStageTelemetryField.TIME_TAKEN_FOR_DEEP_COPY_PLAN.value: deep_copy_time,
CompilationStageTelemetryField.TIME_TAKEN_FOR_CTE_OPTIMIZATION.value: cte_time,
CompilationStageTelemetryField.TIME_TAKEN_FOR_LARGE_QUERY_BREAKDOWN.value: large_query_breakdown_time,
CompilationStageTelemetryField.COMPLEXITY_SCORE_BEFORE_COMPILATION.value: complexity_score_before_compilation,
CompilationStageTelemetryField.COMPLEXITY_SCORE_AFTER_CTE_OPTIMIZATION.value: complexity_scores_after_cte,
CompilationStageTelemetryField.COMPLEXITY_SCORE_AFTER_LARGE_QUERY_BREAKDOWN.value: complexity_scores_after_large_query_breakdown,
}
session._conn._telemetry_client.send_query_compilation_summary_telemetry(
session_id=session.session_id,
plan_uuid=self._plan.uuid,
compilation_stage_summary=summary_value,
)
return queries
else:
final_plan = self._plan
if self._plan.session.cte_optimization_enabled:
Expand Down
15 changes: 15 additions & 0 deletions src/snowflake/snowpark/_internal/compiler/telemetry_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,25 @@


class CompilationStageTelemetryField(Enum):
# types
TYPE_LARGE_QUERY_BREAKDOWN_OPTIMIZATION_SKIPPED = (
"snowpark_large_query_breakdown_optimization_skipped"
)
TYPE_COMPILATION_STAGE_STATISTICS = "snowpark_compilation_stage_statistics"

# keys
KEY_REASON = "reason"
PLAN_UUID = "plan_uuid"
TIME_TAKEN_FOR_COMPILATION = "time_taken_for_compilation_sec"
TIME_TAKEN_FOR_DEEP_COPY_PLAN = "time_taken_for_deep_copy_plan_sec"
TIME_TAKEN_FOR_CTE_OPTIMIZATION = "time_taken_for_cte_optimization_sec"
TIME_TAKEN_FOR_LARGE_QUERY_BREAKDOWN = "time_taken_for_large_query_breakdown_sec"
COMPLEXITY_SCORE_BOUNDS = "complexity_score_bounds"
COMPLEXITY_SCORE_BEFORE_COMPILATION = "complexity_score_before_compilation"
COMPLEXITY_SCORE_AFTER_CTE_OPTIMIZATION = "complexity_score_after_cte_optimization"
COMPLEXITY_SCORE_AFTER_LARGE_QUERY_BREAKDOWN = (
"complexity_score_after_large_query_breakdown"
)


class SkipLargeQueryBreakdownCategory(Enum):
Expand Down
23 changes: 23 additions & 0 deletions src/snowflake/snowpark/_internal/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ def wrap(*args, **kwargs):
]._session.sql_simplifier_enabled
try:
api_calls[0][TelemetryField.QUERY_PLAN_HEIGHT.value] = plan.plan_height
# The uuid for df._select_statement can be different from df._plan. Since plan
# can take both values, we cannot use plan.uuid. We always use df._plan.uuid
# to track the queries.
uuid = args[0]._plan.uuid
api_calls[0][CompilationStageTelemetryField.PLAN_UUID.value] = uuid
api_calls[0][
TelemetryField.QUERY_PLAN_NUM_DUPLICATE_NODES.value
] = plan.num_duplicate_nodes
Expand Down Expand Up @@ -428,6 +433,24 @@ def send_large_query_breakdown_telemetry(
}
self.send(message)

def send_query_compilation_summary_telemetry(
self,
session_id: int,
plan_uuid: str,
compilation_stage_summary: Dict[str, Any],
) -> None:
message = {
**self._create_basic_telemetry_data(
CompilationStageTelemetryField.TYPE_COMPILATION_STAGE_STATISTICS.value
),
TelemetryField.KEY_DATA.value: {
TelemetryField.SESSION_ID.value: session_id,
CompilationStageTelemetryField.PLAN_UUID.value: plan_uuid,
**compilation_stage_summary,
},
}
self.send(message)

def send_large_query_optimization_skipped_telemetry(
self, session_id: int, reason: str
) -> None:
Expand Down
14 changes: 7 additions & 7 deletions src/snowflake/snowpark/modin/pandas/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ def unique(values) -> np.ndarray:
>>> pd.unique([pd.Timestamp('2016-01-01', tz='US/Eastern')
... for _ in range(3)])
array([Timestamp('2015-12-31 21:00:00-0800', tz='America/Los_Angeles')],
array([Timestamp('2016-01-01 00:00:00-0500', tz='UTC-05:00')],
dtype=object)
>>> pd.unique([("a", "b"), ("b", "a"), ("a", "c"), ("b", "a")])
Expand Down Expand Up @@ -1750,35 +1750,35 @@ def to_datetime(
DatetimeIndex(['2018-10-26 12:00:00', '2018-10-26 13:00:15'], dtype='datetime64[ns]', freq=None)
>>> pd.to_datetime(['2018-10-26 12:00:00 -0500', '2018-10-26 13:00:00 -0500'])
DatetimeIndex(['2018-10-26 10:00:00-07:00', '2018-10-26 11:00:00-07:00'], dtype='datetime64[ns, America/Los_Angeles]', freq=None)
DatetimeIndex(['2018-10-26 12:00:00-05:00', '2018-10-26 13:00:00-05:00'], dtype='datetime64[ns, UTC-05:00]', freq=None)
- Use right format to convert to timezone-aware type (Note that when call Snowpark
pandas API to_pandas() the timezone-aware output will always be converted to session timezone):
>>> pd.to_datetime(['2018-10-26 12:00:00 -0500', '2018-10-26 13:00:00 -0500'], format="%Y-%m-%d %H:%M:%S %z")
DatetimeIndex(['2018-10-26 10:00:00-07:00', '2018-10-26 11:00:00-07:00'], dtype='datetime64[ns, America/Los_Angeles]', freq=None)
DatetimeIndex(['2018-10-26 12:00:00-05:00', '2018-10-26 13:00:00-05:00'], dtype='datetime64[ns, UTC-05:00]', freq=None)
- Timezone-aware inputs *with mixed time offsets* (for example
issued from a timezone with daylight savings, such as Europe/Paris):
>>> pd.to_datetime(['2020-10-25 02:00:00 +0200', '2020-10-25 04:00:00 +0100'])
DatetimeIndex(['2020-10-24 17:00:00-07:00', '2020-10-24 20:00:00-07:00'], dtype='datetime64[ns, America/Los_Angeles]', freq=None)
DatetimeIndex([2020-10-25 02:00:00+02:00, 2020-10-25 04:00:00+01:00], dtype='object', freq=None)
>>> pd.to_datetime(['2020-10-25 02:00:00 +0200', '2020-10-25 04:00:00 +0100'], format="%Y-%m-%d %H:%M:%S %z")
DatetimeIndex(['2020-10-24 17:00:00-07:00', '2020-10-24 20:00:00-07:00'], dtype='datetime64[ns, America/Los_Angeles]', freq=None)
DatetimeIndex([2020-10-25 02:00:00+02:00, 2020-10-25 04:00:00+01:00], dtype='object', freq=None)
Setting ``utc=True`` makes sure always convert to timezone-aware outputs:
- Timezone-naive inputs are *localized* based on the session timezone
>>> pd.to_datetime(['2018-10-26 12:00', '2018-10-26 13:00'], utc=True)
DatetimeIndex(['2018-10-26 05:00:00-07:00', '2018-10-26 06:00:00-07:00'], dtype='datetime64[ns, America/Los_Angeles]', freq=None)
DatetimeIndex(['2018-10-26 12:00:00+00:00', '2018-10-26 13:00:00+00:00'], dtype='datetime64[ns, UTC]', freq=None)
- Timezone-aware inputs are *converted* to session timezone
>>> pd.to_datetime(['2018-10-26 12:00:00 -0530', '2018-10-26 12:00:00 -0500'],
... utc=True)
DatetimeIndex(['2018-10-26 10:30:00-07:00', '2018-10-26 10:00:00-07:00'], dtype='datetime64[ns, America/Los_Angeles]', freq=None)
DatetimeIndex(['2018-10-26 17:30:00+00:00', '2018-10-26 17:00:00+00:00'], dtype='datetime64[ns, UTC]', freq=None)
"""
# TODO: SNOW-1063345: Modin upgrade - modin.pandas functions in general.py
raise_if_native_pandas_objects(arg)
Expand Down
Loading

0 comments on commit 966e5fa

Please sign in to comment.