-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1527717: Add profiler snowpark API #2252
Changes from 6 commits
0c1b6d9
9d911df
968ac2c
71967c6
3222459
e689d54
83b22a0
e1d314a
bf71cc2
b80f113
3320814
286cc31
791cf2a
b458a85
ba726ff
242e4de
bf4d169
e788125
c45ca1e
32a448c
72e0162
1d992f2
60c4aba
0657c2e
dcc6ad7
eda9559
d196ddd
6a7046d
fcac5fd
b2c6513
a27e64c
896c9cd
3e6a771
0d75b84
f9df3cd
de0d5c0
8ee830e
1e2d86c
4629d89
43c0f5a
76f5eaa
1a626f1
f5c246a
251126f
e67cca4
3c30f9a
233a9f4
aa90a19
bb4220d
d6ba864
6b265dc
63908d1
37fa336
393e98e
e538669
468872d
80f8635
51da343
60885bc
77b9457
5d299df
b335b77
1188594
157bc3d
9fb57ac
773e69e
e406a56
292834c
95cb15d
766d77e
776ba68
1324b1d
e89f6b9
375f11b
2906d0b
dfe2de8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
# | ||
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. | ||
# | ||
from contextlib import contextmanager | ||
from typing import List, Optional | ||
|
||
import snowflake.snowpark | ||
from snowflake.snowpark._internal.utils import validate_object_name | ||
|
||
|
||
class Profiler: | ||
def __init__( | ||
self, | ||
stage: Optional[str] = "", | ||
active_profiler: Optional[str] = "LINE", | ||
session: Optional["snowflake.snowpark.Session"] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add docstrings and example code for them |
||
) -> None: | ||
self.stage = stage | ||
self.active_profiler = active_profiler | ||
self.modules_to_register = [] | ||
self.register_modules_sql = "" | ||
self.set_targeted_stage_sql = "" | ||
self.enable_profiler_sql = "" | ||
self.disable_profiler_sql = "" | ||
self.set_active_profiler_sql = "" | ||
self.session = session | ||
self.prepare_sql() | ||
self.query_history = None | ||
|
||
def prepare_sql(self): | ||
self.register_modules_sql = f"alter session set python_profiler_modules='{','.join(self.modules_to_register)}'" | ||
sfc-gh-yuwang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.set_targeted_stage_sql = ( | ||
f'alter session set PYTHON_PROFILER_TARGET_STAGE ="{self.stage}"' | ||
) | ||
self.enable_profiler_sql = "alter session set ENABLE_PYTHON_PROFILER = true" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://docs.snowflake.com/en/developer-guide/stored-procedure/stored-procedures-rights#owner-s-rights-stored-procedures Owner's right SP will not be able to set session-level parameter. Let's add this as known limitation that this API can't be used. |
||
self.disable_profiler_sql = "alter session set ENABLE_PYTHON_PROFILER = false" | ||
self.set_active_profiler_sql = f"alter session set ACTIVE_PYTHON_PROFILER = '{self.active_profiler.upper()}'" | ||
|
||
def register_profiler_modules(self, modules: List[str]): | ||
self.modules_to_register = modules | ||
self.prepare_sql() | ||
if self.session is not None: | ||
self._register_modules() | ||
|
||
def set_targeted_stage(self, stage: str): | ||
validate_object_name(stage) | ||
self.stage = stage | ||
self.prepare_sql() | ||
if self.session is not None: | ||
self._set_targeted_stage() | ||
|
||
def set_active_profiler(self, active_profiler: str): | ||
if self.active_profiler not in ["LINE", "MEMORY"]: | ||
raise ValueError( | ||
f"active_profiler expect 'LINE' or 'MEMORY', got {self.active_profiler} instead" | ||
) | ||
self.active_profiler = active_profiler | ||
self.prepare_sql() | ||
if self.session is not None: | ||
self._set_active_profiler() | ||
|
||
def _register_modules(self): | ||
self.session.sql(self.register_modules_sql).collect() | ||
|
||
def _set_targeted_stage(self): | ||
self.session.sql(self.set_targeted_stage_sql).collect() | ||
|
||
def _set_active_profiler(self): | ||
self.session.sql(self.set_active_profiler_sql).collect() | ||
|
||
def enable_profiler(self): | ||
self.session.sql(self.enable_profiler_sql).collect() | ||
sfc-gh-yuwang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def disable_profiler(self): | ||
self.session.sql(self.disable_profiler_sql).collect() | ||
|
||
def _get_last_query_id(self): | ||
sps = self.session.sql("show procedures").collect() | ||
sfc-gh-yuwang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
names = [r.name for r in sps] | ||
for query in self.query_history.queries[::-1]: | ||
if query.sql_text.startswith("CALL"): | ||
sfc-gh-yuwang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
sp_name = query.sql_text.split(" ")[1].split("(")[0] | ||
if sp_name.upper() in names: | ||
return query.query_id | ||
return None | ||
|
||
def show_profiles(self): | ||
query_id = self._get_last_query_id() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is non-deterministic. A session will be thread-safe so multiple SPs may be called concurrently. |
||
sql = f"select snowflake.core.get_python_profiler_output('{query_id}');" | ||
res = self.session.sql(sql).collect() | ||
print(res[0][0]) # noqa: T201: we need to print here. | ||
return res[0][0] | ||
|
||
def dump_profiles(self, dst_file: str): | ||
query_id = self._get_last_query_id() | ||
sql = f"select snowflake.core.get_python_profiler_output('{query_id}');" | ||
res = self.session.sql(sql).collect() | ||
with open(dst_file, "w") as f: | ||
f.write(str(res[0][0])) | ||
|
||
|
||
@contextmanager | ||
def profiler( | ||
stage: str, | ||
active_profiler: str, | ||
session: "snowflake.snowpark.Session", | ||
modules: Optional[List[str]] = None, | ||
): | ||
internal_profiler = Profiler(stage, active_profiler, session) | ||
session.profiler = internal_profiler | ||
internal_profiler.query_history = session.query_history() | ||
modules = [] if modules is None else modules | ||
sfc-gh-yuwang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try: | ||
# set up phase | ||
internal_profiler._set_targeted_stage() | ||
internal_profiler._set_active_profiler() | ||
|
||
internal_profiler.register_profiler_modules(modules) | ||
internal_profiler._register_modules() | ||
internal_profiler.enable_profiler() | ||
finally: | ||
yield | ||
internal_profiler.register_profiler_modules([]) | ||
internal_profiler._register_modules() | ||
internal_profiler.disable_profiler() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -147,6 +147,7 @@ | |
from snowflake.snowpark.mock._plan_builder import MockSnowflakePlanBuilder | ||
from snowflake.snowpark.mock._stored_procedure import MockStoredProcedureRegistration | ||
from snowflake.snowpark.mock._udf import MockUDFRegistration | ||
from snowflake.snowpark.profiler import Profiler | ||
from snowflake.snowpark.query_history import QueryHistory | ||
from snowflake.snowpark.row import Row | ||
from snowflake.snowpark.stored_procedure import StoredProcedureRegistration | ||
|
@@ -570,6 +571,7 @@ def __init__( | |
self._temp_table_auto_cleaner: TempTableAutoCleaner = TempTableAutoCleaner(self) | ||
if self._auto_clean_up_temp_table_enabled: | ||
self._temp_table_auto_cleaner.start() | ||
self.profiler = None | ||
|
||
_logger.info("Snowpark Session information: %s", self._session_info) | ||
|
||
|
@@ -3385,6 +3387,45 @@ def flatten( | |
set_api_call_source(df, "Session.flatten") | ||
return df | ||
|
||
def register_profiler(self, profiler: Profiler): | ||
"""Register a profiler to current session, all action are actually executed during this function""" | ||
self.profiler = profiler | ||
self.profiler.session = self | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we pass a profiler which is created a different session, is the overwriting here on purpose? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good question, I think one profiler link to only one session is a good idea since the new session could have different setting. |
||
if len(self.sql(f"show stages like '{profiler.stage}'").collect()) == 0: | ||
self.sql(f"create or replace temp stage if not exists {profiler.stage}") | ||
sfc-gh-yuwang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.profiler._register_modules() | ||
self.profiler._set_targeted_stage() | ||
self.profiler._set_active_profiler() | ||
self.profiler.query_history = self.query_history() | ||
|
||
def show_profiles(self): | ||
"""Gather and return result of profiler, results are also print to console""" | ||
if self.profiler is not None and isinstance(self.profiler, Profiler): | ||
return self.profiler.show_profiles() | ||
else: | ||
raise ValueError( | ||
"profiler is not set, use session.register_profiler or profiler context manager" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. profiler is not set, use session.register_profiler or profiler context manager |
||
) | ||
|
||
def dump_profiles(self, dst_file: str): | ||
"""Gather result of a profiler and redirect it to a file""" | ||
if self.profiler is not None and isinstance(self.profiler, Profiler): | ||
self.profiler.dump_profiles(dst_file=dst_file) | ||
else: | ||
raise ValueError( | ||
"profiler is not set, use session.register_profiler or profiler context manager" | ||
) | ||
|
||
def register_profiler_modules(self, modules: List[str]): | ||
"""Register modules want to create profile""" | ||
if self.profiler is not None and isinstance(self.profiler, Profiler): | ||
self.profiler.register_profiler_modules(modules) | ||
else: | ||
sql_statement = ( | ||
f"alter session set python_profiler_modules='{','.join(modules)}'" | ||
) | ||
self.sql(sql_statement).collect() | ||
|
||
def query_history(self) -> QueryHistory: | ||
"""Create an instance of :class:`QueryHistory` as a context manager to record queries that are pushed down to the Snowflake database. | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
# | ||
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved. | ||
# | ||
|
||
import pytest | ||
|
||
import snowflake.snowpark | ||
from snowflake.snowpark import DataFrame | ||
from snowflake.snowpark.functions import sproc | ||
from snowflake.snowpark.profiler import Profiler, profiler | ||
from tests.utils import Utils | ||
|
||
tmp_stage_name = Utils.random_stage_name() | ||
|
||
|
||
@pytest.fixture(scope="module", autouse=True) | ||
def setup(session, resources_path, local_testing_mode): | ||
if not local_testing_mode: | ||
Utils.create_stage(session, tmp_stage_name, is_temporary=True) | ||
session.add_packages("snowflake-snowpark-python") | ||
|
||
|
||
def test_profiler_with_context_manager(session, db_parameters): | ||
@sproc(name="table_sp", replace=True) | ||
def table_sp(session: snowflake.snowpark.Session) -> DataFrame: | ||
return session.sql("select 1") | ||
|
||
session.register_profiler_modules(["table_sp"]) | ||
with profiler( | ||
stage=f"{db_parameters['database']}.{db_parameters['schema']}.{tmp_stage_name}", | ||
active_profiler="LINE", | ||
session=session, | ||
): | ||
session.call("table_sp").collect() | ||
res = session.show_profiles() | ||
session.register_profiler_modules([]) | ||
assert res is not None | ||
assert "Modules Profiled" in res | ||
|
||
|
||
def test_profiler_with_profiler_class(session, db_parameters): | ||
@sproc(name="table_sp", replace=True) | ||
def table_sp(session: snowflake.snowpark.Session) -> DataFrame: | ||
return session.sql("select 1") | ||
|
||
profiler = Profiler() | ||
profiler.register_profiler_modules(["table_sp"]) | ||
profiler.set_active_profiler("LINE") | ||
profiler.set_targeted_stage( | ||
f"{db_parameters['database']}.{db_parameters['schema']}.{tmp_stage_name}" | ||
) | ||
session.register_profiler(profiler) | ||
|
||
profiler.enable_profiler() | ||
|
||
session.call("table_sp").collect() | ||
res = session.show_profiles() | ||
|
||
profiler.disable_profiler() | ||
|
||
profiler.register_profiler_modules([]) | ||
assert res is not None | ||
assert "Modules Profiled" in res | ||
|
||
|
||
def test_single_return_value_of_sp(session, db_parameters): | ||
@sproc(name="single_value_sp", replace=True) | ||
def single_value_sp(session: snowflake.snowpark.Session) -> str: | ||
return "success" | ||
|
||
session.register_profiler_modules(["table_sp"]) | ||
with profiler( | ||
stage=f"{db_parameters['database']}.{db_parameters['schema']}.{tmp_stage_name}", | ||
active_profiler="LINE", | ||
session=session, | ||
): | ||
session.call("single_value_sp") | ||
res = session.show_profiles() | ||
session.register_profiler_modules([]) | ||
assert res is not None | ||
assert "Modules Profiled" in res |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what should customer expect to happen if session is None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
session is required for profiler to work.
There is two way to use profiler as described in the design doc.
In context manager, session needed to be provided when create the context manager.
In profiler class, session is set when registering profiler.
This is why I make it optional here.