-
Notifications
You must be signed in to change notification settings - Fork 117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1432019 Calculate subtree query complexity #1657
SNOW-1432019 Calculate subtree query complexity #1657
Conversation
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
…b.com:snowflakedb/snowpark-python into aalam-SNOW-1432019-calculate-candidacy-scores
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neat!
from typing import AbstractSet, Optional | ||
|
||
# collections.Counter does not pass type checker. Changes with appropriate type hints were made in 3.9+ | ||
if sys.version_info <= (3, 9): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if sys.version_info <= (3, 9): | |
if sys.version_info < (3, 9): |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also have this type annotation in one util file and we can import from that place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can move it to a util file - that's a good idea.
@@ -80,6 +96,18 @@ def sql(self) -> str: | |||
) | |||
return f"{self.pretty_name}({children_sql})" | |||
|
|||
@property |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some comments here or in complexity_stat.py to brief talk about the main idea of complexity score calculation (e.g., use # of columns involved as a proxy, etc.)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
@property | ||
def individual_complexity_stat(self) -> Counter[str]: | ||
# SELECT * FROM entity | ||
return Counter({ComplexityStat.COLUMN.value: 1}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*
is regarded as one column instead of all columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. in terms of compiling complexity *
is easier than col1
, ... col100
. So we need to reflect that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is not exactly true, * will be easier for parsing, but once it passed the star expansion stage, it is literally the same as putting all columns there. However, when it is just star expression, it might make the some of the stages little bit easier, like unnesting etc. I think we can start with 1 for now, things can be adjusted eventually
def individual_complexity_stat(self) -> Counter[str]: | ||
# select $1, ..., $m FROM VALUES (r11, r12, ..., r1m), (rn1, ...., rnm) | ||
# TODO: use ARRAY_BIND_THRESHOLD | ||
return Counter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah then the complexity of create_dataframe will be very high compared with select from a table. If the table is large, then at least execution of selecting from a table will be more "complex"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's right. even if data created from create_dataframe
may be smaller than a table and thus be less expensive in terms of execution, this complexity is only concerned with compiler complexity since we won't be able to estimate execution complexity without more data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine for solving compilation issues, but we should consider data size and other factors when choosing no action/cte/materialization for eliminating repeated subqueries, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sfc-gh-aalam i think @sfc-gh-jdu got a good point here, and i recall we typically just use variable binding for this, and if data is coming from client side, i assume it could be small, maybe we can simply count the column, and have a VALUE category, and set 1 for value
estimate += Counter({ComplexityStat.LOW_IMPACT.value: 1}) | ||
|
||
get_complexity_stat = ( | ||
lambda expr: expr.cumulative_complexity_stat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lambda expr: expr.cumulative_complexity_stat | |
getattr(expr, "cumulative_complexity_stat", Counter({ComplexityStat.COLUMN.value: 1})) |
and no function is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep. updated
@@ -656,7 +656,7 @@ def get_result_and_metadata( | |||
|
|||
def get_result_query_id(self, plan: SnowflakePlan, **kwargs) -> str: | |||
# get the iterator such that the data is not fetched | |||
result_set, _ = self.get_result_set(plan, to_iter=True, **kwargs) | |||
result_set, _ = self.get_result_set(plan, ignore_results=True, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might be in another PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. this was a mistake. thanks for catching it
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
def individual_complexity_stat(self) -> Counter[str]: | ||
# select $1, ..., $m FROM VALUES (r11, r12, ..., r1m), (rn1, ...., rnm) | ||
# TODO: use ARRAY_BIND_THRESHOLD | ||
return Counter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine for solving compilation issues, but we should consider data size and other factors when choosing no action/cte/materialization for eliminating repeated subqueries, right?
@@ -187,3 +194,23 @@ def __init__( | |||
@property | |||
def sql(self) -> str: | |||
return self.join_type.sql | |||
|
|||
@property | |||
def individual_complexity_stat(self) -> Counter[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would prefer score
instead of stat
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am renaming this to individual_node_complexity
and cumulative_node_complexity
. since the return value is a dictionary, adding score
could also be misleading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so when i look at those, those are just plan_statistic or plan_info to me, but
I am fine with node_complexity if preferred. score is definitely misleading since we are not really calculating any score here yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I earlier had stat as a short hand for statistic
@@ -349,13 +351,21 @@ def test_async_batch_insert(session): | |||
reason="TODO(SNOW-932722): Cancel query is not allowed in stored proc", | |||
) | |||
def test_async_is_running_and_cancel(session): | |||
async_job = session.sql("select SYSTEM$WAIT(3)").collect_nowait() | |||
# creating a sproc here because describe query on SYSTEM$WAIT() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the commit is checked in accidentally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no. this is another one of the case where describe query is launched. in this case, internal describe query on SELECT$WAIT
actually blocks the query
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, but I mean should it be in another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- 1 this change seems irrelevant to this pr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will revert this change and fix SelectSQL implementation so we do not trigger analyze attributes for it.
# SELECT * FROM (left) AS left_alias join_type_sql JOIN (right) AS right_alias match_cond, using_cond, join_cond | ||
estimate = Counter({ComplexityStat.JOIN.value: 1}) | ||
if isinstance(self.join_type, UsingJoin) and self.join_type.using_columns: | ||
estimate += Counter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think I am still confused here. The join seems a LogicalPlan node, if we do not plan to do it at the plan node level, where do we plan to calculate the accumulated complexity for join then?
@@ -349,13 +351,21 @@ def test_async_batch_insert(session): | |||
reason="TODO(SNOW-932722): Cancel query is not allowed in stored proc", | |||
) | |||
def test_async_is_running_and_cancel(session): | |||
async_job = session.sql("select SYSTEM$WAIT(3)").collect_nowait() | |||
# creating a sproc here because describe query on SYSTEM$WAIT() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- 1 this change seems irrelevant to this pr
|
||
KT = typing.TypeVar("KT") | ||
|
||
class Counter(collections.Counter, typing.Counter[KT]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, if map1 + map2 is the only difference, can we simply use map then? Right now the returned type is Counter[str] which is not straight forward to understand with a str, for example, why complexity is represented as a string? to reduce unnecessary confusion in the code, let's use more obvious data structure for representation
COLUMN = "column" | ||
FUNCTION = "function" | ||
IN = "in" | ||
LOW_IMPACT = "low_impact" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we can have both, then we have a category for each node in the plan tree node, it would be easy for us to calculate the total number of nodes in the tree
@sfc-gh-aalam thanks for accommodate the comments. the overall flow looks good to me, but i think i still have some major questions that i am kind of confused? especially the individual_complexity seems not so useful to me, maybe let's go over the major questions on Monday's meeting very quickly |
OTHERS = "others" | ||
|
||
|
||
def sum_node_complexities(*node_complexities: Dict[str, int]) -> Dict[str, int]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in fact, can the complicity just be Dict[PlanNodeCategory, int]
OTHERS = "others" | ||
|
||
|
||
def sum_node_complexities(*node_complexities: Dict[str, int]) -> Dict[str, int]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a comment for this function. for example: this is a helper function for summing the complexity for all given node complexity, the node complicity is represented as a mapping between PlanNodeCategory, and the total count of the corresponding category
@@ -658,6 +688,61 @@ def schema_query(self) -> str: | |||
def children_plan_nodes(self) -> List[Union["Selectable", SnowflakePlan]]: | |||
return [self.from_] | |||
|
|||
@property | |||
def individual_node_complexity(self) -> Dict[str, int]: | |||
score = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's do not call this score, kind of misleading
PlanNodeCategory.ORDER_BY.value: 1, | ||
PlanNodeCategory.LITERAL.value: 3, # step, start, count | ||
PlanNodeCategory.COLUMN.value: 1, # id column | ||
PlanNodeCategory.LOW_IMPACT.value: 2, # ROW_NUMBER, GENERATOR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shoudn't row_number and generator counted as function? since those are snowflake functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, we should. I'll fix it
def individual_node_complexity(self) -> Dict[str, int]: | ||
# SELECT * RENAME (before AS after, ...) FROM child | ||
return { | ||
PlanNodeCategory.COLUMN.value: 1 + 2 * len(self.column_map), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why it has a 2* here? shouldn't before as after counted as 1 column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for each col1 AS col2, we are counting complexity as col1.complexity + col2.complexity. Which makes 2* len(column_map) in this case
return { | ||
PlanNodeCategory.COLUMN.value: 1 + 2 * len(self.column_map), | ||
PlanNodeCategory.LOW_IMPACT.value: 1 + len(self.column_map), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the 1 value for is that for the alias name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 if for * because the sql expression is
SELECT * RENAME (before_col AS after_col, ....) from child
{ | ||
PlanNodeCategory.COLUMN.value: 3, | ||
PlanNodeCategory.LOW_IMPACT.value: 1, | ||
PlanNodeCategory.FUNCTION.value: 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, where does the function comes from? is that from aliasing function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it comes from AVG
@property | ||
def cumulative_node_complexity(self) -> Dict[PlanNodeCategory, int]: | ||
"""Returns the aggregate sum complexity statistic from the subtree rooted at this | ||
expression node. Statistic of current node is included in the final aggregate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment here to mention that the node complexity is the sum of its individual complexity and children complicity, please make sure overwrite the individual_node_complexity properly for new nodes to get the cumulative complexity correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Seems like your changes contain some Local Testing changes, please request review from @snowflakedb/local-testing |
Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR.
Fixes SNOW-1432019
Fill out the following pre-review checklist:
Please describe how your code solves the related issue.
see doc: https://docs.google.com/document/d/1IS8qyNmWecF_Lej723hlqXXKUVceATnfeBziyWbFlh0/edit