-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1663726 make session config updates thread safe #2302
SNOW-1663726 make session config updates thread safe #2302
Conversation
…ection-thread-safe
…ad-safe' into aalam-SNOW-1418523-make-analyzer-server_connection-thread-safe
SNOW-1418523-make-udf-sproc-thread-safe
…n_stage, plan_builder
@@ -172,13 +173,21 @@ def analyze( | |||
expr: Union[Expression, NamedExpression], | |||
df_aliased_col_name_to_real_col_name: DefaultDict[str, Dict[str, str]], | |||
parse_local_name=False, | |||
config_context: Optional[ConfigContext] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i would prefer to make this non-optional, so people can think explicitly when they are calling the interface
from typing import Any | ||
|
||
|
||
class ConfigContext: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with this, i think for the one you mentioned before for the access sql_simplifier_enabled, we should also use this configContext if it is accessing the sql_simplifier_enabled multiple times in the function call
self.session = session | ||
self.configs = { | ||
"_query_compilation_stage_enabled", | ||
"cte_optimization_enabled", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of doing this, it might be more clear to provide a property interface in the session that teaks a snapshot. so it is the interface is at a closer place with the properties.
Given the fact that we are adding more and more configs, i am thinking it might worth to do a refactoring to centralize all configs, and wrap all of them into a class to reduce the possiblility for developer make mistakes.
@@ -531,6 +542,10 @@ def __init__( | |||
# on the optimized plan. During the final query generation, no schema query is needed, | |||
# this helps reduces un-necessary overhead for the describing call. | |||
self._skip_schema_query = skip_schema_query | |||
self._config_context: Optional[ConfigContext] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The config context here should probably been done in the same way as what we have done for analyze and resolve as parameter for each builder.
@@ -564,9 +579,10 @@ def build( | |||
), "No schema query is available in child SnowflakePlan" | |||
new_schema_query = schema_query or sql_generator(child.schema_query) | |||
|
|||
config_context = self._config_context or ConfigContext(self.session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each build function seems can take a new snapshot, but multiple build can be called during the whole resolve, and they are suppose to get the same snapshot, it is safer and cleaner to simply extend each interface to take the config context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intend to remove config_context
from plan_builder once old CTE implementation is gone. That is the only reason why plan builder needs this info. I can own this task. Without that, the refactor will be too big and it would be prone to more errors. I want to avoid that.
@@ -46,6 +47,7 @@ class PlanCompiler: | |||
|
|||
def __init__(self, plan: SnowflakePlan) -> None: | |||
self._plan = plan | |||
self._config_context = ConfigContext(self._plan.session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is good, we are only using one snapshot during the whole compilation stage
session.sql_simplifier_enabled = sql_simplifier_enabled | ||
with session._lock: | ||
sql_simplifier_enabled = session.sql_simplifier_enabled | ||
session.sql_simplifier_enabled = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that code here is hacky and bad. do you know what is the ticket SNOW-1646883 referring for, i think we should fix that and remove this code, we are not suppose to change the config in our code like this.
…ad-safe' into aalam-SNOW-1663726-make-session-config-updates-thread-safe
…ad-safe' into aalam-SNOW-1663726-make-session-config-updates-thread-safe
@@ -49,6 +49,13 @@ class PlanCompiler: | |||
|
|||
def __init__(self, plan: SnowflakePlan) -> None: | |||
self._plan = plan | |||
session = plan.session | |||
self.cte_optimization_enabled = session.cte_optimization_enabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we are restricting the change to config runtime, this is not necessary anymore, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are only putting restriction on cte_optimization_enabled
. Since for other, we still need to take a snapshot, I'm taking a snapshot for cte param for completeness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since now we are putting protection on all config variables, i think we don't need to do that anymore
src/snowflake/snowpark/session.py
Outdated
self._session_id | ||
if threading.active_count() > 1: | ||
# TODO (SNOW-1541096): Remove the limitation once old cte implementation is removed. | ||
_logger.warning( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we could simply disallow updating of any config value when there are multiple active thread to be consistent everywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, i actually uses large_query_breakdown when deciding whether the complexity can be merged or not.
src/snowflake/snowpark/session.py
Outdated
@@ -827,12 +868,18 @@ def large_query_breakdown_enabled(self, value: bool) -> None: | |||
materialize the partitions, and then combine them to execute the query to improve | |||
overall performance. | |||
""" | |||
if threading.active_count() > 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can probably have a small utility function and call that for all configs with the config name as a parameter
@@ -49,6 +49,13 @@ class PlanCompiler: | |||
|
|||
def __init__(self, plan: SnowflakePlan) -> None: | |||
self._plan = plan | |||
session = plan.session | |||
self.cte_optimization_enabled = session.cte_optimization_enabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since now we are putting protection on all config variables, i think we don't need to do that anymore
@@ -151,8 +154,7 @@ def compile(self) -> Dict[PlanQueryType, List[Query]]: | |||
return queries | |||
else: | |||
final_plan = self._plan | |||
if self._plan.session.cte_optimization_enabled: | |||
final_plan = final_plan.replace_repeated_subquery_with_cte() | |||
final_plan = final_plan.replace_repeated_subquery_with_cte() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is that intended? i think we still need the condition here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. replace_repeated_subquery_with_cte()
does this check automatically.
if threading.active_count() > 1: | ||
logger.warning( | ||
"You might have more than one threads sharing the Session object trying to update " | ||
f"{config}. This is currently not thread-safe and may cause unexpected behavior. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the sense here sounds little bit wired, and updating the config its-self is thread safe, it just make the other operations unsafe. maybe say updating f"{config}" while other tasks are running can potentially cause unexpected behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
src/snowflake/snowpark/session.py
Outdated
@@ -637,6 +639,16 @@ def _analyzer(self) -> Analyzer: | |||
) | |||
return self._thread_store.analyzer | |||
|
|||
@property | |||
def _plan_builder(self): | |||
if not hasattr(self._thread_store, "plan_builder"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, i didn't notice we have plan builder here, i thought plan builder is just part of the analyzer, what is the plan builder used here?, and what is the reason that we moved it to a property
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and the plan builder seems thread local to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This plan builder is different from analyzer's plan builder. Only used by
- DataFramReader to create read_file plan
- FileOperation to create file_operation_plan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added it as part of old changes. reverted back to original now since plan builder should be thread-safe with restricted config updates
5672a1d
into
aalam-SNOW-1418523-make-internal-session-variables-thread-safe
Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR.
Fixes SNOW-1663726
Fill out the following pre-review checklist:
Please describe how your code solves the related issue.
In this PR we make the following changes:
_plan_builder
to isolate usage.ConfigContext
to set a context and read a snapshot of config values from session. Use theConfigContext
object forAnalyzer
,PlanCompiler
andSnowflakePlanBuilder
.do_analyzer
and_compile
that runs using a singleConfigContext