-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1819531: propagate referenced ctes to all root nodes #2670
base: main
Are you sure you want to change the base?
Changes from 20 commits
d2c924b
7b7ad95
ce30000
09c9752
3ca612a
8a48d9b
f7fe2eb
2fad709
850294d
8f6ca76
16909dc
4db6421
78d52aa
ffcb577
eadcd8f
28ed69b
ca9be3a
bfb1412
d826c21
2f1e1d7
5daf97e
4528959
8f0a23b
9ff53d8
fb83d0e
ee472b8
1623ca6
340769e
49072b9
dd6b2f9
4e913f8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -169,6 +169,7 @@ def do_resolve_with_resolved_children( | |
iceberg_config=logical_plan.iceberg_config, | ||
table_exists=logical_plan.table_exists, | ||
) | ||
resolved_plan.referenced_ctes = resolved_child.referenced_ctes | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it seems we missed the cte reference propagation in the plan builder for some case, we should double check the PlanBuilder code in snowflake_plan, instead of doing the fix here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After some thoughts, let's update the referenced_ctes definition to the cte referenced in the plan tree, and let's propagate the ctes reference for all nodes, in other words, let's deprecate this parameter https://github.com/snowflakedb/snowpark-python/blob/main/src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py#L538, and make sure the referenced ctes is propagated correctly for all plan builder, please double check all places that directly create SnowflakePlan to make sure the referenced_ctes is propagated correctly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the correction of code generation, we can use the way you have now, but please make sure we comment it out clearly about why things is done in such way There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the cte reference is propagated correctly during plan builder, you shouldn't need to re-propagate here.
sfc-gh-aalam marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
elif isinstance( | ||
logical_plan, | ||
|
@@ -197,6 +198,7 @@ def do_resolve_with_resolved_children( | |
resolved_plan = super().do_resolve_with_resolved_children( | ||
logical_plan, resolved_children, df_aliased_col_name_to_real_col_name | ||
) | ||
resolved_plan.referenced_ctes = resolved_child.referenced_ctes | ||
|
||
elif isinstance(logical_plan, Selectable): | ||
# overwrite the Selectable resolving to make sure we are triggering | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,7 +34,10 @@ | |
TableMerge, | ||
TableUpdate, | ||
) | ||
from snowflake.snowpark._internal.analyzer.unary_plan_node import UnaryNode | ||
from snowflake.snowpark._internal.analyzer.unary_plan_node import ( | ||
CreateViewCommand, | ||
UnaryNode, | ||
) | ||
from snowflake.snowpark._internal.compiler.query_generator import ( | ||
QueryGenerator, | ||
SnowflakeCreateTablePlanInfo, | ||
|
@@ -121,7 +124,9 @@ def to_selectable(plan: LogicalPlan, query_generator: QueryGenerator) -> Selecta | |
return plan | ||
|
||
snowflake_plan = query_generator.resolve(plan) | ||
return SelectSnowflakePlan(snowflake_plan, analyzer=query_generator) | ||
selectable = SelectSnowflakePlan(snowflake_plan, analyzer=query_generator) | ||
selectable._is_valid_for_replacement = snowflake_plan._is_valid_for_replacement | ||
return selectable | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
if not parent._is_valid_for_replacement: | ||
raise ValueError(f"parent node {parent} is not valid for replacement.") | ||
|
@@ -306,7 +311,28 @@ def get_snowflake_plan_queries( | |
|
||
plan_queries = plan.queries | ||
post_action_queries = plan.post_actions | ||
if len(plan.referenced_ctes) > 0: | ||
# If the plan has referenced ctes, we need to add the cte definition before | ||
# the final query. This is done for all source plan except for the following | ||
# cases: | ||
# - SnowflakeCreateTable | ||
# - CreateViewCommand | ||
# - TableUpdate | ||
# - TableDelete | ||
# - TableMerge | ||
# - CopyIntoLocationNode | ||
# because the generated_queries by QueryGenerator for these nodes already include the cte | ||
# definition. Adding the cte definition before the query again will cause a syntax error. | ||
if len(plan.referenced_ctes) > 0 and not isinstance( | ||
plan.source_plan, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. adda a comment here about why are we doing this here |
||
( | ||
SnowflakeCreateTable, | ||
CreateViewCommand, | ||
TableUpdate, | ||
TableDelete, | ||
TableMerge, | ||
CopyIntoLocationNode, | ||
), | ||
): | ||
# make a copy of the original query to avoid any update to the | ||
# original query object | ||
plan_queries = copy.deepcopy(plan.queries) | ||
|
@@ -397,6 +423,9 @@ def get_name(node: Optional[LogicalPlan]) -> str: | |
name = f"{name} :: ({'| '.join(properties)})" | ||
|
||
score = get_complexity_score(node) | ||
num_ref_ctes = "nil" | ||
if isinstance(node, (SnowflakePlan, Selectable)): | ||
num_ref_ctes = len(node.referenced_ctes) | ||
sql_text = "" | ||
if isinstance(node, Selectable): | ||
sql_text = node.sql_query | ||
|
@@ -405,7 +434,7 @@ def get_name(node: Optional[LogicalPlan]) -> str: | |
sql_size = len(sql_text) | ||
sql_preview = sql_text[:50] | ||
|
||
return f"{name=}\n{score=}, {sql_size=}\n{sql_preview=}" | ||
return f"{name=}\n{score=}, {num_ref_ctes=}, {sql_size=}\n{sql_preview=}" | ||
|
||
g = graphviz.Graph(format="png") | ||
|
||
|
@@ -415,7 +444,8 @@ def get_name(node: Optional[LogicalPlan]) -> str: | |
next_level = [] | ||
for node in curr_level: | ||
node_id = hex(id(node)) | ||
g.node(node_id, get_stat(node)) | ||
color = "lightblue" if node._is_valid_for_replacement else "red" | ||
g.node(node_id, get_stat(node), color=color) | ||
if isinstance(node, (Selectable, SnowflakePlan)): | ||
children = node.children_plan_nodes | ||
else: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between "relaxed pipeline breaker" and "pipeline breaker"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipeline breaker are all nodes like sort, pivot etc which are listed in the is_pipeline_breaker function.
relaxed pipeline breaker are those nodes which are not pipeline breakers but can be used to cut if no valid pipeline breaker is found. For now
SelectStatement
is the only relaxed pipeline breaker.