Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1663726 make session config updates thread safe #2302

Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
56fb566
init
sfc-gh-aalam Sep 11, 2024
66003d1
make udf/sproc related files thread-safe
sfc-gh-aalam Sep 11, 2024
0e58205
Merge branch 'main' into aalam-SNOW-1418523-make-udf-sproc-thread-safe
sfc-gh-aalam Sep 11, 2024
e75dde1
init
sfc-gh-aalam Sep 11, 2024
68a8c1c
make query listener thread-safe
sfc-gh-aalam Sep 11, 2024
31a5734
Fix query_tag and last_action_id
sfc-gh-aalam Sep 11, 2024
b4dadda
core updates done
sfc-gh-aalam Sep 11, 2024
b8c6496
Add tests
sfc-gh-aalam Sep 12, 2024
f39837e
Fix local tests
sfc-gh-aalam Sep 12, 2024
31a196f
Merge branch 'main' into aalam-SNOW-1418523-make-analyzer-server_conn…
sfc-gh-aalam Sep 12, 2024
723bdf7
Merge branch 'aalam-SNOW-1418523-make-internal-session-variables-thre…
sfc-gh-aalam Sep 12, 2024
37c0419
add file IO tests
sfc-gh-aalam Sep 12, 2024
8a2d433
Merge branch 'aalam-SNOW-1418523-concurrent-file-operations' into aal…
sfc-gh-aalam Sep 12, 2024
a083989
make session._runtime_version_from_requirement safe
sfc-gh-aalam Sep 13, 2024
947d384
add sp/udf concurrent tests
sfc-gh-aalam Sep 13, 2024
fd51720
fix broken test
sfc-gh-aalam Sep 13, 2024
3077853
add udtf/udaf tests
sfc-gh-aalam Sep 13, 2024
65c3186
fix broken test
sfc-gh-aalam Sep 13, 2024
94412cf
sql_simplifier, cte_optimization, eliminate_numeric, query_compilatio…
sfc-gh-aalam Sep 13, 2024
638dd09
cover more configs
sfc-gh-aalam Sep 17, 2024
7ae2c33
fix SnowflakePlan copy
sfc-gh-aalam Sep 17, 2024
1689ebf
minor update
sfc-gh-aalam Sep 17, 2024
5e8a2d2
add description
sfc-gh-aalam Sep 17, 2024
1c83ef2
use _package_lock to protect Session._packages
sfc-gh-aalam Sep 17, 2024
a649761
undo refactor
sfc-gh-aalam Sep 17, 2024
f03d618
undo refactor
sfc-gh-aalam Sep 17, 2024
5f398d5
fix test
sfc-gh-aalam Sep 17, 2024
3807087
fix test
sfc-gh-aalam Sep 17, 2024
4eef3e9
Merge branch 'aalam-SNOW-1418523-make-internal-session-variables-thre…
sfc-gh-aalam Sep 17, 2024
df3263c
add file IO tests
sfc-gh-aalam Sep 12, 2024
6769c54
merge with base
sfc-gh-aalam Sep 17, 2024
af86f67
merge with base
sfc-gh-aalam Sep 17, 2024
a737f33
fix test
sfc-gh-aalam Sep 17, 2024
8ca2730
protect complexity bounds setter with lock
sfc-gh-aalam Sep 17, 2024
81417a3
add config context
sfc-gh-aalam Sep 19, 2024
e340567
add tests
sfc-gh-aalam Sep 19, 2024
30952bb
update documentation
sfc-gh-aalam Sep 20, 2024
03f25b5
use config context in plan compiler
sfc-gh-aalam Sep 20, 2024
6deb402
add comments
sfc-gh-aalam Sep 20, 2024
8e1dfe0
minor refactor
sfc-gh-aalam Sep 20, 2024
10bfeb4
fix test
sfc-gh-aalam Sep 20, 2024
879940a
update documentation
sfc-gh-aalam Sep 20, 2024
5aad2d9
simplify context config
sfc-gh-aalam Sep 25, 2024
669eb91
merge with base
sfc-gh-aalam Sep 25, 2024
a85a144
add config context to repeated subquery elimination resolution stage
sfc-gh-aalam Sep 25, 2024
a79ffb4
fix tests
sfc-gh-aalam Sep 26, 2024
4420350
refactor
sfc-gh-aalam Sep 26, 2024
5f1eaa6
remove do_analyze
sfc-gh-aalam Sep 27, 2024
9d62017
fix
sfc-gh-aalam Sep 27, 2024
b58aa8b
fix
sfc-gh-aalam Sep 27, 2024
db37033
fix
sfc-gh-aalam Sep 27, 2024
dddd15f
fix unit tests
sfc-gh-aalam Sep 27, 2024
57ee9e8
simplify
sfc-gh-aalam Sep 27, 2024
809a86e
simplify
sfc-gh-aalam Sep 27, 2024
6021ab8
simplify
sfc-gh-aalam Sep 27, 2024
43986f6
simplify
sfc-gh-aalam Sep 27, 2024
0430e92
simplify
sfc-gh-aalam Sep 27, 2024
095b04e
remove config context
sfc-gh-aalam Sep 30, 2024
32707f9
min-diff
sfc-gh-aalam Sep 30, 2024
3bf678d
min-diff
sfc-gh-aalam Sep 30, 2024
3eade1a
min-diff
sfc-gh-aalam Sep 30, 2024
1850d5d
Merge branch 'aalam-SNOW-1418523-make-internal-session-variables-thre…
sfc-gh-aalam Oct 2, 2024
1fa6ad2
add warnings
sfc-gh-aalam Oct 2, 2024
7c85432
Merge branch 'aalam-SNOW-1418523-make-internal-session-variables-thre…
sfc-gh-aalam Oct 3, 2024
f994842
address feedback
sfc-gh-aalam Oct 3, 2024
4621836
address feedback
sfc-gh-aalam Oct 3, 2024
e1c68f3
fix string
sfc-gh-aalam Oct 3, 2024
e5b48dd
ignore on multi-thread
sfc-gh-aalam Oct 3, 2024
496e2be
undo ignore
sfc-gh-aalam Oct 3, 2024
980d3b7
update warning message
sfc-gh-aalam Oct 4, 2024
54a6b5d
address comments
sfc-gh-aalam Oct 4, 2024
67609e8
address comments
sfc-gh-aalam Oct 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
422 changes: 337 additions & 85 deletions src/snowflake/snowpark/_internal/analyzer/analyzer.py

Large diffs are not rendered by default.

37 changes: 37 additions & 0 deletions src/snowflake/snowpark/_internal/analyzer/config_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#

from typing import Any


class ConfigContext:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this, i think for the one you mentioned before for the access sql_simplifier_enabled, we should also use this configContext if it is accessing the sql_simplifier_enabled multiple times in the function call

"""Class to help reading a snapshot of configuration attributes from a session object.

On instantiation, this object stores the configuration from the session object
and returns the stored configuration attributes when requested.
"""

def __init__(self, session) -> None:
self.session = session
self.configs = {
"_query_compilation_stage_enabled",
"cte_optimization_enabled",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of doing this, it might be more clear to provide a property interface in the session that teaks a snapshot. so it is the interface is at a closer place with the properties.

Given the fact that we are adding more and more configs, i am thinking it might worth to do a refactoring to centralize all configs, and wrap all of them into a class to reduce the possiblility for developer make mistakes.

"eliminate_numeric_sql_value_cast_enabled",
"large_query_breakdown_complexity_bounds",
"large_query_breakdown_enabled",
}
self._create_snapshot()

def __getattr__(self, name: str) -> Any:
if name in self.configs:
return getattr(self.session, name)
raise AttributeError(f"ConfigContext has no attribute {name}")

def _create_snapshot(self) -> "ConfigContext":
"""Reads the configuration attributes from the session object and stores them
in the context object.
"""
for name in self.configs:
setattr(self, name, getattr(self.session, name))
return self
61 changes: 46 additions & 15 deletions src/snowflake/snowpark/_internal/analyzer/snowflake_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Union,
)

from snowflake.snowpark._internal.analyzer.config_context import ConfigContext
from snowflake.snowpark._internal.analyzer.query_plan_analysis_utils import (
PlanNodeCategory,
sum_node_complexities,
Expand Down Expand Up @@ -312,14 +313,16 @@ def children_plan_nodes(self) -> List[Union["Selectable", "SnowflakePlan"]]:
else:
return []

def replace_repeated_subquery_with_cte(self) -> "SnowflakePlan":
def replace_repeated_subquery_with_cte(
self, config_context: ConfigContext
) -> "SnowflakePlan":
# parameter protection
# the common subquery elimination will be applied if cte_optimization is not enabled
# and the new compilation stage is not enabled. When new compilation stage is enabled,
# the common subquery elimination will be done through the new plan transformation.
if (
not self.session._cte_optimization_enabled
or self.session._query_compilation_stage_enabled
not config_context.cte_optimization_enabled
or config_context._query_compilation_stage_enabled
):
return self

Expand Down Expand Up @@ -355,8 +358,16 @@ def replace_repeated_subquery_with_cte(self) -> "SnowflakePlan":
# create CTE query
final_query = create_cte_query(self, duplicate_plan_set)

with self.session._lock:
# copy depends on the cte_optimization_enabled value. We should keep it
# consistent with the current context.
original_cte_optimization = self.session.cte_optimization_enabled
self.session.cte_optimization_enabled = (
config_context.cte_optimization_enabled
)
plan = copy.copy(self)
self.session.cte_optimization_enabled = original_cte_optimization
# all other parts of query are unchanged, but just replace the original query
plan = copy.copy(self)
plan.queries[-1].sql = final_query
return plan

Expand Down Expand Up @@ -531,6 +542,10 @@ def __init__(
# on the optimized plan. During the final query generation, no schema query is needed,
# this helps reduces un-necessary overhead for the describing call.
self._skip_schema_query = skip_schema_query
self._config_context: Optional[ConfigContext] = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config context here should probably been done in the same way as what we have done for analyze and resolve as parameter for each builder.


def set_config_context(self, config_context: ConfigContext) -> None:
self._config_context = config_context

@SnowflakePlan.Decorator.wrap_exception
def build(
Expand Down Expand Up @@ -564,9 +579,10 @@ def build(
), "No schema query is available in child SnowflakePlan"
new_schema_query = schema_query or sql_generator(child.schema_query)

config_context = self._config_context or ConfigContext(self.session)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each build function seems can take a new snapshot, but multiple build can be called during the whole resolve, and they are suppose to get the same snapshot, it is safer and cleaner to simply extend each interface to take the config context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intend to remove config_context from plan_builder once old CTE implementation is gone. That is the only reason why plan builder needs this info. I can own this task. Without that, the refactor will be too big and it would be prone to more errors. I want to avoid that.

placeholder_query = (
sql_generator(select_child._id)
if self.session._cte_optimization_enabled and select_child._id is not None
if config_context.cte_optimization_enabled and select_child._id is not None
else None
)

Expand Down Expand Up @@ -603,9 +619,10 @@ def build_binary(
right_schema_query = schema_value_statement(select_right.attributes)
schema_query = sql_generator(left_schema_query, right_schema_query)

config_context = self._config_context or ConfigContext(self.session)
placeholder_query = (
sql_generator(select_left._id, select_right._id)
if self.session._cte_optimization_enabled
if config_context.cte_optimization_enabled
and select_left._id is not None
and select_right._id is not None
else None
Expand Down Expand Up @@ -637,8 +654,8 @@ def build_binary(

referenced_ctes: Set[str] = set()
if (
self.session.cte_optimization_enabled
and self.session._query_compilation_stage_enabled
config_context.cte_optimization_enabled
and config_context._query_compilation_stage_enabled
):
# When the cte optimization and the new compilation stage is enabled,
# the referred cte tables are propagated from left and right can have
Expand Down Expand Up @@ -928,7 +945,9 @@ def save_as_table(
column_definition_with_hidden_columns,
)

child = child.replace_repeated_subquery_with_cte()
child = child.replace_repeated_subquery_with_cte(
self._config_context or ConfigContext(self.session)
)

def get_create_table_as_select_plan(child: SnowflakePlan, replace, error):
return self.build(
Expand Down Expand Up @@ -1116,7 +1135,9 @@ def create_or_replace_view(
if not is_sql_select_statement(child.queries[0].sql.lower().strip()):
raise SnowparkClientExceptionMessages.PLAN_CREATE_VIEWS_FROM_SELECT_ONLY()

child = child.replace_repeated_subquery_with_cte()
child = child.replace_repeated_subquery_with_cte(
self._config_context or ConfigContext(self.session)
)
return self.build(
lambda x: create_or_replace_view_statement(name, x, is_temp, comment),
child,
Expand Down Expand Up @@ -1159,7 +1180,9 @@ def create_or_replace_dynamic_table(
# should never reach here
raise ValueError(f"Unknown create mode: {create_mode}") # pragma: no cover

child = child.replace_repeated_subquery_with_cte()
child = child.replace_repeated_subquery_with_cte(
self._config_context or ConfigContext(self.session)
)
return self.build(
lambda x: create_or_replace_dynamic_table_statement(
name=name,
Expand Down Expand Up @@ -1462,7 +1485,9 @@ def copy_into_location(
header: bool = False,
**copy_options: Optional[Any],
) -> SnowflakePlan:
query = query.replace_repeated_subquery_with_cte()
query = query.replace_repeated_subquery_with_cte(
self._config_context or ConfigContext(self.session)
)
return self.build(
lambda x: copy_into_location(
query=x,
Expand All @@ -1489,7 +1514,9 @@ def update(
source_plan: Optional[LogicalPlan],
) -> SnowflakePlan:
if source_data:
source_data = source_data.replace_repeated_subquery_with_cte()
source_data = source_data.replace_repeated_subquery_with_cte(
self._config_context or ConfigContext(self.session)
)
return self.build(
lambda x: update_statement(
table_name,
Expand Down Expand Up @@ -1520,7 +1547,9 @@ def delete(
source_plan: Optional[LogicalPlan],
) -> SnowflakePlan:
if source_data:
source_data = source_data.replace_repeated_subquery_with_cte()
source_data = source_data.replace_repeated_subquery_with_cte(
self._config_context or ConfigContext(self.session)
)
return self.build(
lambda x: delete_statement(
table_name,
Expand Down Expand Up @@ -1549,7 +1578,9 @@ def merge(
clauses: List[str],
source_plan: Optional[LogicalPlan],
) -> SnowflakePlan:
source_data = source_data.replace_repeated_subquery_with_cte()
source_data = source_data.replace_repeated_subquery_with_cte(
self._config_context or ConfigContext(self.session)
)
return self.build(
lambda x: merge_statement(table_name, x, join_expr, clauses),
source_data,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,14 @@ def __init__(
session: Session,
query_generator: QueryGenerator,
logical_plans: List[LogicalPlan],
complexity_bounds: Tuple[int, int],
) -> None:
self.session = session
self._query_generator = query_generator
self.logical_plans = logical_plans
self._parent_map = defaultdict(set)
self.complexity_score_lower_bound = (
session.large_query_breakdown_complexity_bounds[0]
)
self.complexity_score_upper_bound = (
session.large_query_breakdown_complexity_bounds[1]
)
self.complexity_score_lower_bound = complexity_bounds[0]
self.complexity_score_upper_bound = complexity_bounds[1]

def apply(self) -> List[LogicalPlan]:
if is_active_transaction(self.session):
Expand Down
30 changes: 18 additions & 12 deletions src/snowflake/snowpark/_internal/compiler/plan_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
from typing import Dict, List

from snowflake.snowpark._internal.analyzer.config_context import ConfigContext
from snowflake.snowpark._internal.analyzer.query_plan_analysis_utils import (
get_complexity_score,
)
Expand Down Expand Up @@ -46,6 +47,7 @@ class PlanCompiler:

def __init__(self, plan: SnowflakePlan) -> None:
self._plan = plan
self._config_context = ConfigContext(self._plan.session)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is good, we are only using one snapshot during the whole compilation stage


def should_start_query_compilation(self) -> bool:
"""
Expand All @@ -65,10 +67,10 @@ def should_start_query_compilation(self) -> bool:
return (
not isinstance(current_session._conn, MockServerConnection)
and (self._plan.source_plan is not None)
and current_session._query_compilation_stage_enabled
and self._config_context._query_compilation_stage_enabled
and (
current_session.cte_optimization_enabled
or current_session.large_query_breakdown_enabled
self._config_context.cte_optimization_enabled
or self._config_context.large_query_breakdown_enabled
)
)

Expand All @@ -84,12 +86,12 @@ def compile(self) -> Dict[PlanQueryType, List[Query]]:
deep_copy_end_time = time.time()

# 2. create a code generator with the original plan
query_generator = create_query_generator(self._plan)
query_generator = create_query_generator(self._plan, self._config_context)

# 3. apply each optimizations if needed
# CTE optimization
cte_start_time = time.time()
if self._plan.session.cte_optimization_enabled:
if self._config_context.cte_optimization_enabled:
repeated_subquery_eliminator = RepeatedSubqueryElimination(
logical_plans, query_generator
)
Expand All @@ -102,9 +104,12 @@ def compile(self) -> Dict[PlanQueryType, List[Query]]:
]

# Large query breakdown
if self._plan.session.large_query_breakdown_enabled:
if self._config_context.large_query_breakdown_enabled:
large_query_breakdown = LargeQueryBreakdown(
self._plan.session, query_generator, logical_plans
self._plan.session,
query_generator,
logical_plans,
self._config_context.large_query_breakdown_complexity_bounds,
)
logical_plans = large_query_breakdown.apply()

Expand All @@ -124,9 +129,9 @@ def compile(self) -> Dict[PlanQueryType, List[Query]]:
total_time = time.time() - start_time
session = self._plan.session
summary_value = {
TelemetryField.CTE_OPTIMIZATION_ENABLED.value: session.cte_optimization_enabled,
TelemetryField.LARGE_QUERY_BREAKDOWN_ENABLED.value: session.large_query_breakdown_enabled,
CompilationStageTelemetryField.COMPLEXITY_SCORE_BOUNDS.value: session.large_query_breakdown_complexity_bounds,
TelemetryField.CTE_OPTIMIZATION_ENABLED.value: self._config_context.cte_optimization_enabled,
TelemetryField.LARGE_QUERY_BREAKDOWN_ENABLED.value: self._config_context.large_query_breakdown_enabled,
CompilationStageTelemetryField.COMPLEXITY_SCORE_BOUNDS.value: self._config_context.large_query_breakdown_complexity_bounds,
CompilationStageTelemetryField.TIME_TAKEN_FOR_COMPILATION.value: total_time,
CompilationStageTelemetryField.TIME_TAKEN_FOR_DEEP_COPY_PLAN.value: deep_copy_time,
CompilationStageTelemetryField.TIME_TAKEN_FOR_CTE_OPTIMIZATION.value: cte_time,
Expand All @@ -143,8 +148,9 @@ def compile(self) -> Dict[PlanQueryType, List[Query]]:
return queries
else:
final_plan = self._plan
if self._plan.session.cte_optimization_enabled:
final_plan = final_plan.replace_repeated_subquery_with_cte()
final_plan = final_plan.replace_repeated_subquery_with_cte(
self._config_context
)
return {
PlanQueryType.QUERIES: final_plan.queries,
PlanQueryType.POST_ACTIONS: final_plan.post_actions,
Expand Down
19 changes: 16 additions & 3 deletions src/snowflake/snowpark/_internal/compiler/query_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from snowflake.snowpark._internal.analyzer.expression import Attribute
from snowflake.snowpark._internal.analyzer.select_statement import Selectable
from snowflake.snowpark._internal.analyzer.snowflake_plan import (
ConfigContext,
CreateViewCommand,
PlanQueryType,
Query,
Expand Down Expand Up @@ -49,6 +50,7 @@ class QueryGenerator(Analyzer):
def __init__(
self,
session: Session,
config_context: ConfigContext,
snowflake_create_table_plan_info: Optional[SnowflakeCreateTablePlanInfo] = None,
) -> None:
super().__init__(session)
Expand All @@ -58,6 +60,7 @@ def __init__(
self._snowflake_create_table_plan_info: Optional[
SnowflakeCreateTablePlanInfo
] = snowflake_create_table_plan_info
self._config_context = config_context
# Records the definition of all the with query blocks encountered during the code generation.
# This information will be used to generate the final query of a SnowflakePlan with the
# correct CTE definition.
Expand Down Expand Up @@ -105,18 +108,22 @@ def generate_queries(
PlanQueryType.POST_ACTIONS: post_actions,
}

def resolve(self, logical_plan: LogicalPlan) -> SnowflakePlan:
return super().resolve(logical_plan, self._config_context)

def do_resolve_with_resolved_children(
self,
logical_plan: LogicalPlan,
resolved_children: Dict[LogicalPlan, SnowflakePlan],
df_aliased_col_name_to_real_col_name: DefaultDict[str, Dict[str, str]],
config_context: ConfigContext,
) -> SnowflakePlan:
if isinstance(logical_plan, SnowflakePlan):
if logical_plan.queries is None:
assert logical_plan.source_plan is not None
# when encounter a SnowflakePlan with no queries, try to re-resolve
# the source plan to construct the result
res = self.do_resolve(logical_plan.source_plan)
res = self.do_resolve(logical_plan.source_plan, config_context)
resolved_children[logical_plan] = res
resolved_plan = res
else:
Expand Down Expand Up @@ -204,7 +211,10 @@ def do_resolve_with_resolved_children(
copied_resolved_child.queries = final_queries[PlanQueryType.QUERIES]
resolved_children[logical_plan.children[0]] = copied_resolved_child
resolved_plan = super().do_resolve_with_resolved_children(
logical_plan, resolved_children, df_aliased_col_name_to_real_col_name
logical_plan,
resolved_children,
df_aliased_col_name_to_real_col_name,
config_context,
)

elif isinstance(logical_plan, Selectable):
Expand All @@ -228,7 +238,10 @@ def do_resolve_with_resolved_children(

else:
resolved_plan = super().do_resolve_with_resolved_children(
logical_plan, resolved_children, df_aliased_col_name_to_real_col_name
logical_plan,
resolved_children,
df_aliased_col_name_to_real_col_name,
config_context,
)

resolved_plan._is_valid_for_replacement = True
Expand Down
Loading
Loading