Skip to content

Commit

Permalink
[SNOW-1541092] Add RepeatedSubqueryElimination for the new compilatio…
Browse files Browse the repository at this point in the history
…n stage (#2006)
  • Loading branch information
sfc-gh-yzou authored Aug 6, 2024
1 parent 5e360be commit b96234e
Show file tree
Hide file tree
Showing 13 changed files with 847 additions and 149 deletions.
9 changes: 6 additions & 3 deletions src/snowflake/snowpark/_internal/analyzer/cte_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import hashlib
import logging
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Optional, Sequence, Set, Union
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Set, Tuple, Union

from snowflake.snowpark._internal.analyzer.analyzer_utils import (
SPACE,
Expand All @@ -24,7 +24,9 @@
TreeNode = Union[SnowflakePlan, Selectable]


def find_duplicate_subtrees(root: "TreeNode") -> Set["TreeNode"]:
def find_duplicate_subtrees(
root: "TreeNode",
) -> Tuple[Set["TreeNode"], Dict["TreeNode", Set["TreeNode"]]]:
"""
Returns a set containing all duplicate subtrees in query plan tree.
The root of a duplicate subtree is defined as a duplicate node, if
Expand Down Expand Up @@ -79,7 +81,8 @@ def is_duplicate_subtree(node: "TreeNode") -> bool:
return False

traverse(root)
return {node for node in node_count_map if is_duplicate_subtree(node)}
duplicated_node = {node for node in node_count_map if is_duplicate_subtree(node)}
return duplicated_node, node_parents_map


def create_cte_query(root: "TreeNode", duplicate_plan_set: Set["TreeNode"]) -> str:
Expand Down
45 changes: 43 additions & 2 deletions src/snowflake/snowpark/_internal/analyzer/select_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ def get_snowflake_plan(self, skip_schema_query) -> SnowflakePlan:
df_aliased_col_name_to_real_col_name=self.df_aliased_col_name_to_real_col_name,
source_plan=self,
placeholder_query=self.placeholder_query,
referenced_ctes=self.referenced_ctes,
)
# set api_calls to self._snowflake_plan outside of the above constructor
# because the constructor copy api_calls.
Expand Down Expand Up @@ -373,6 +374,12 @@ def column_states(self, value: ColumnStateDict):
"""
self._column_states = deepcopy(value)

@property
@abstractmethod
def referenced_ctes(self) -> Set[str]:
"""Return the set of ctes referenced by the whole selectable subtree, includes its-self and children"""
pass


class SelectableEntity(Selectable):
"""Query from a table, view, or any other Snowflake objects.
Expand All @@ -385,7 +392,8 @@ def __init__(
*,
analyzer: "Analyzer",
) -> None:
# currently only selecting from a table is supported for this class
# currently only selecting from a table or cte is supported
# to read as entity
assert isinstance(entity, SnowflakeTable)
super().__init__(analyzer)
self.entity = entity
Expand Down Expand Up @@ -421,6 +429,12 @@ def plan_node_category(self) -> PlanNodeCategory:
def query_params(self) -> Optional[Sequence[Any]]:
return None

@property
def referenced_ctes(self) -> Set[str]:
# the SelectableEntity only allows select from base table. No
# CTE table will be referred.
return set()


class SelectSQL(Selectable):
"""Query from a SQL. Mainly used by session.sql()"""
Expand Down Expand Up @@ -518,6 +532,12 @@ def to_subqueryable(self) -> "SelectSQL":
new._api_calls = self._api_calls
return new

@property
def referenced_ctes(self) -> Set[str]:
# SelectSQL directly calls sql query, there will be no
# auto created CTE tables referenced
return set()


class SelectSnowflakePlan(Selectable):
"""Wrap a SnowflakePlan to a subclass of Selectable."""
Expand Down Expand Up @@ -578,6 +598,10 @@ def query_params(self) -> Optional[Sequence[Any]]:
def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]:
return self.snowflake_plan.individual_node_complexity

@property
def referenced_ctes(self) -> Set[str]:
return self._snowflake_plan.referenced_ctes


class SelectStatement(Selectable):
"""The main logic plan to be used by a DataFrame.
Expand Down Expand Up @@ -712,7 +736,11 @@ def sql_query(self) -> str:
self._sql_query = self.from_.sql_query
return self._sql_query
from_clause = self.from_.sql_in_subquery
if self.analyzer.session._cte_optimization_enabled and self.from_._id:
if (
self.analyzer.session._cte_optimization_enabled
and (not self.analyzer.session._query_compilation_stage_enabled)
and self.from_._id
):
placeholder = f"{analyzer_utils.LEFT_PARENTHESIS}{self.from_._id}{analyzer_utils.RIGHT_PARENTHESIS}"
self._sql_query = self.placeholder_query.replace(placeholder, from_clause)
else:
Expand Down Expand Up @@ -844,6 +872,10 @@ def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]:
)
return complexity

@property
def referenced_ctes(self) -> Set[str]:
return self.from_.referenced_ctes

def to_subqueryable(self) -> "Selectable":
"""When this SelectStatement's subquery is not subqueryable (can't be used in `from` clause of the sql),
convert it to subqueryable and create a new SelectStatement with from_ being the new subqueryable。
Expand Down Expand Up @@ -1169,6 +1201,10 @@ def query_params(self) -> Optional[Sequence[Any]]:
def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]:
return self.snowflake_plan.individual_node_complexity

@property
def referenced_ctes(self) -> Set[str]:
return self._snowflake_plan.referenced_ctes


class SetOperand:
def __init__(self, selectable: Selectable, operator: Optional[str] = None) -> None:
Expand Down Expand Up @@ -1261,6 +1297,11 @@ def individual_node_complexity(self) -> Dict[PlanNodeCategory, int]:
# we add #set_operands - 1 additional operators in sql query
return {PlanNodeCategory.SET_OPERATION: len(self.set_operands) - 1}

@property
def referenced_ctes(self) -> Set[str]:
# get a union of referenced cte tables from all child nodes
return set().union(*[node.referenced_ctes for node in self._nodes])


class DeriveColumnDependencyError(Exception):
"""When deriving column dependencies from the subquery."""
Expand Down
Loading

0 comments on commit b96234e

Please sign in to comment.