Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1527717: Add profiler snowpark API #2252

Merged
merged 76 commits into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
0c1b6d9
add profiler
sfc-gh-yuwang Sep 7, 2024
9d911df
profiler finish
sfc-gh-yuwang Sep 9, 2024
968ac2c
t
sfc-gh-yuwang Sep 9, 2024
71967c6
t
sfc-gh-yuwang Sep 9, 2024
3222459
add test
sfc-gh-yuwang Sep 10, 2024
e689d54
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 10, 2024
83b22a0
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 11, 2024
e1d314a
address comment
sfc-gh-yuwang Sep 11, 2024
bf71cc2
fix get last query id
sfc-gh-yuwang Sep 11, 2024
b80f113
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 16, 2024
3320814
Update session.py
sfc-gh-yuwang Sep 16, 2024
286cc31
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 16, 2024
791cf2a
add docstring
sfc-gh-yuwang Sep 17, 2024
b458a85
add regx for anonymous procedure
sfc-gh-yuwang Sep 17, 2024
ba726ff
add docstring
sfc-gh-yuwang Sep 17, 2024
242e4de
skip in localtesting
sfc-gh-yuwang Sep 17, 2024
bf4d169
coverage test
sfc-gh-yuwang Sep 17, 2024
e788125
fix lint
sfc-gh-yuwang Sep 17, 2024
c45ca1e
lint fix
sfc-gh-yuwang Sep 17, 2024
32a448c
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 17, 2024
72e0162
fix test
sfc-gh-yuwang Sep 18, 2024
1d992f2
fix test
sfc-gh-yuwang Sep 18, 2024
60c4aba
fix test
sfc-gh-yuwang Sep 18, 2024
0657c2e
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 18, 2024
dcc6ad7
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 18, 2024
eda9559
add test
sfc-gh-yuwang Sep 18, 2024
d196ddd
add unit test
sfc-gh-yuwang Sep 18, 2024
6a7046d
fix test
sfc-gh-yuwang Sep 19, 2024
fcac5fd
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 19, 2024
b2c6513
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 19, 2024
a27e64c
make test robust
sfc-gh-yuwang Sep 19, 2024
896c9cd
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 19, 2024
3e6a771
address comments
sfc-gh-yuwang Sep 20, 2024
0d75b84
fix test
sfc-gh-yuwang Sep 20, 2024
f9df3cd
address comments
sfc-gh-yuwang Sep 20, 2024
de0d5c0
align with doc
sfc-gh-yuwang Sep 27, 2024
8ee830e
fix test
sfc-gh-yuwang Sep 30, 2024
1e2d86c
fix doc
sfc-gh-yuwang Sep 30, 2024
4629d89
fix test
sfc-gh-yuwang Sep 30, 2024
43c0f5a
fix test
sfc-gh-yuwang Sep 30, 2024
76f5eaa
rename to stored procedure profiler
sfc-gh-yuwang Oct 1, 2024
1a626f1
rename to stored procedure profiler
sfc-gh-yuwang Oct 1, 2024
f5c246a
multi thread compatiable
sfc-gh-yuwang Oct 1, 2024
251126f
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Oct 2, 2024
e67cca4
multi thread support
sfc-gh-yuwang Oct 2, 2024
3c30f9a
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Oct 2, 2024
233a9f4
remove show and dump
sfc-gh-yuwang Oct 2, 2024
aa90a19
fix docstring
sfc-gh-yuwang Oct 2, 2024
bb4220d
address comments
sfc-gh-yuwang Oct 2, 2024
d6ba864
make prifler thread safe
sfc-gh-yuwang Oct 2, 2024
6b265dc
destroy query history when not in use
sfc-gh-yuwang Oct 2, 2024
63908d1
address comments
sfc-gh-yuwang Oct 2, 2024
37fa336
address comments
sfc-gh-yuwang Oct 3, 2024
393e98e
address comment
sfc-gh-yuwang Oct 3, 2024
e538669
address comments
sfc-gh-yuwang Oct 3, 2024
468872d
address comments
sfc-gh-yuwang Oct 3, 2024
80f8635
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Oct 3, 2024
51da343
fix doc test
sfc-gh-yuwang Oct 3, 2024
60885bc
fixtest
sfc-gh-yuwang Oct 3, 2024
77b9457
add type check
sfc-gh-yuwang Oct 3, 2024
5d299df
fix type check
sfc-gh-yuwang Oct 3, 2024
b335b77
address comments
sfc-gh-yuwang Oct 3, 2024
1188594
change logic of finding stage
sfc-gh-yuwang Oct 3, 2024
157bc3d
remove bracket
sfc-gh-yuwang Oct 3, 2024
9fb57ac
use existing pattern
sfc-gh-yuwang Oct 3, 2024
773e69e
change test
sfc-gh-yuwang Oct 3, 2024
e406a56
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Oct 3, 2024
292834c
thread safe change
sfc-gh-yuwang Oct 4, 2024
95cb15d
address comments
sfc-gh-yuwang Oct 4, 2024
766d77e
remove link
sfc-gh-yuwang Oct 4, 2024
776ba68
add prpr tag
sfc-gh-yuwang Oct 4, 2024
1324b1d
add prpr tag
sfc-gh-yuwang Oct 4, 2024
e89f6b9
type check
sfc-gh-yuwang Oct 4, 2024
375f11b
add test for coverage
sfc-gh-yuwang Oct 4, 2024
2906d0b
fix test
sfc-gh-yuwang Oct 4, 2024
dfe2de8
fix test
sfc-gh-yuwang Oct 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ This is a re-release of 1.22.0. Please refer to the 1.22.0 release notes for det
- Added the following new functions in `snowflake.snowpark.functions`:
- `array_remove`
- `ln`
- Added snowpark python API for profiler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

snowpark python -> Snowpark Python


#### Improvements

Expand Down
212 changes: 212 additions & 0 deletions src/snowflake/snowpark/profiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
#
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#
import re
from contextlib import contextmanager
from typing import List, Optional

import snowflake.snowpark
from snowflake.snowpark._internal.utils import validate_object_name


class Profiler:
"""
Setup profiler to receive profiles of stored procedures.
Copy link
Contributor

@sfc-gh-mabrennan sfc-gh-mabrennan Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setup -> Set up


Note:
This feature cannot be used in owner's right SP because owner's right SP will not be able to set session-level parameters.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

owner's right SP -> owner's rights stored procedure
x 2

"""

def __init__(
self,
stage: Optional[str] = "",
active_profiler: Optional[str] = "LINE",
session: Optional["snowflake.snowpark.Session"] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what should customer expect to happen if session is None?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

session is required for profiler to work.
There is two way to use profiler as described in the design doc.
In context manager, session needed to be provided when create the context manager.
In profiler class, session is set when registering profiler.
This is why I make it optional here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add docstrings and example code for them

) -> None:
self.stage = stage
self.active_profiler = active_profiler
self.modules_to_register = []
self._register_modules_sql = ""
self._set_targeted_stage_sql = ""
self._enable_profiler_sql = ""
self._disable_profiler_sql = ""
self._set_active_profiler_sql = ""
self.pattern = r"WITH\s+.*?\s+AS\s+PROCEDURE\s+.*?\s+CALL\s+.*"
self.session = session
self._prepare_sql()
self.query_history = None

def _prepare_sql(self):
self._register_modules_sql = f"alter session set python_profiler_modules='{','.join(self.modules_to_register)}'"
self._set_targeted_stage_sql = (
f'alter session set PYTHON_PROFILER_TARGET_STAGE ="{self.stage}"'
)
self._disable_profiler_sql = "alter session set ACTIVE_PYTHON_PROFILER = ''"
self._set_active_profiler_sql = f"alter session set ACTIVE_PYTHON_PROFILER = '{self.active_profiler.upper()}'"

def register_profiler_modules(self, stored_procedures: List[str]):
"""
Register stored procedures to generate profiles for them.

Note:
Registered nodules will be overwritten by this function,
use this function with an empty string will remove registered modules.
Args:
stored_procedures: List of names of stored procedures.
"""
self.modules_to_register = stored_procedures
self._prepare_sql()
if self.session is not None:
self._register_modules()

def set_targeted_stage(self, stage: str):
"""
Set targeted stage for profiler output.

Note:
The stage name must be a fully qualified name.

Args:
stage: String of fully qualified name of targeted stage
"""
validate_object_name(stage)
self.stage = stage
self._prepare_sql()
if self.session is not None:
if (
len(self.session.sql(f"show stages like '{self.stage}'").collect()) == 0
and len(
self.session.sql(
f"show stages like '{self.stage.split('.')[-1]}'"
).collect()
)
== 0
):
self.session.sql(
f"create temp stage if not exists {self.stage} FILE_FORMAT = (RECORD_DELIMITER = NONE FIELD_DELIMITER = NONE )"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

temp stage gets dropped at the end of the session which means we might lose all the profiler data.
when are users going to access the file, after the session or during the session?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user want to access it after the session, user would want to use a permanent stage they created.
when user give a stage name that does not exist, we expect user want to create a temp stage.

).collect()
self._set_targeted_stage()

def set_active_profiler(self, active_profiler: str):
"""
Set active profiler.

Note:
Active profiler must be either 'LINE' or 'MEMORY' (case-sensitive),
active profiler is 'LINE' by default.
Args:
active_profiler: String that represent active_profiler, must be either 'LINE' or 'MEMORY' (case-sensitive).

"""
if active_profiler not in ["LINE", "MEMORY"]:
raise ValueError(
f"active_profiler expect 'LINE' or 'MEMORY', got {active_profiler} instead"
)
self.active_profiler = active_profiler
self._prepare_sql()
if self.session is not None:
self._set_active_profiler()

def _register_modules(self):
self.session.sql(self._register_modules_sql).collect()

def _set_targeted_stage(self):
self.session.sql(self._set_targeted_stage_sql).collect()

def _set_active_profiler(self):
self.session.sql(self._set_active_profiler_sql).collect()

def enable(self):
"""
Enable profiler. Profiles will be generated until profiler is disabled.
"""
if self.active_profiler == "":
self.active_profiler = "LINE"
self._prepare_sql()
self._set_active_profiler()

def disable(self):
"""
Disable profiler.
"""
self.session.sql(self._disable_profiler_sql).collect()

def _is_sp_call(self, query):
return re.match(self.pattern, query, re.DOTALL) is not None

def _get_last_query_id(self):
for query in self.query_history.queries[::-1]:
if query.sql_text.upper().startswith("CALL") or self._is_sp_call(
query.sql_text
):
return query.query_id
sfc-gh-yuwang marked this conversation as resolved.
Show resolved Hide resolved
return None

def show_profiles(self) -> str:
"""
Return and show the profiles of last executed stored procedure.

Note:
This function must be called right after the execution of stored procedure you want to profile.
"""
query_id = self._get_last_query_id()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is non-deterministic. A session will be thread-safe so multiple SPs may be called concurrently.

sql = f"select snowflake.core.get_python_profiler_output('{query_id}')"
res = self.session.sql(sql).collect()
print(res[0][0]) # noqa: T201: we need to print here.
return res[0][0]

def dump_profiles(self, dst_file: str):
"""
Write the profiles of last executed stored procedure to given file.

Note:
This function must be called right after the execution of stored procedure you want to profile.

Args:
dst_file: String of file name that you want to store the profiles.
"""
query_id = self._get_last_query_id()
sql = f"select snowflake.core.get_python_profiler_output('{query_id}')"
res = self.session.sql(sql).collect()
with open(dst_file, "w") as f:
f.write(str(res[0][0]))


@contextmanager
def profiler(
stage: str,
active_profiler: str,
session: "snowflake.snowpark.Session",
modules: Optional[List[str]] = None,
):
internal_profiler = Profiler(stage, active_profiler, session)
session.profiler = internal_profiler
internal_profiler.query_history = session.query_history()
modules = modules or []
try:
# create stage if not exist
if (
len(session.sql(f"show stages like '{internal_profiler.stage}'").collect())
== 0
and len(
session.sql(
f"show stages like '{internal_profiler.stage.split('.')[-1]}'"
).collect()
)
== 0
):
session.sql(
f"create temp stage if not exists {internal_profiler.stage} FILE_FORMAT = (RECORD_DELIMITER = NONE FIELD_DELIMITER = NONE )"
).collect()
# set up phase
internal_profiler._set_targeted_stage()
internal_profiler._set_active_profiler()

internal_profiler.register_profiler_modules(modules)
internal_profiler._register_modules()
internal_profiler.enable()
finally:
yield
internal_profiler.register_profiler_modules([])
internal_profiler._register_modules()
internal_profiler.disable()
76 changes: 76 additions & 0 deletions src/snowflake/snowpark/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
from snowflake.snowpark.mock._plan_builder import MockSnowflakePlanBuilder
from snowflake.snowpark.mock._stored_procedure import MockStoredProcedureRegistration
from snowflake.snowpark.mock._udf import MockUDFRegistration
from snowflake.snowpark.profiler import Profiler
from snowflake.snowpark.query_history import QueryHistory
from snowflake.snowpark.row import Row
from snowflake.snowpark.stored_procedure import StoredProcedureRegistration
Expand Down Expand Up @@ -603,6 +604,8 @@ def __init__(
self._conf = self.RuntimeConfig(self, options or {})
self._runtime_version_from_requirement: str = None
self._temp_table_auto_cleaner: TempTableAutoCleaner = TempTableAutoCleaner(self)
self.profiler = None

_logger.info("Snowpark Session information: %s", self._session_info)

def __enter__(self):
Expand Down Expand Up @@ -3456,6 +3459,79 @@ def flatten(
set_api_call_source(df, "Session.flatten")
return df

def register_profiler(self, profiler: Profiler):
"""Register a profiler to a session, all action are actually executed during this function"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

action -> actions
Also, end sentences with a period.

if (
profiler.session is not None
and profiler.session._session_id != self._session_id
):
raise ValueError("A profiler can only be registered to one session.")
self.profiler = profiler
self.profiler.session = self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we pass a profiler which is created a different session, is the overwriting here on purpose?
or another way to ask the question is should one profiler only be linked with one session?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question, I think one profiler link to only one session is a good idea since the new session could have different setting.

if (
len(self.sql(f"show stages like '{profiler.stage}'").collect()) == 0
and len(
self.sql(
f"show stages like '{profiler.stage.split('.')[-1]}'"
).collect()
)
== 0
):
self.sql(
f"create temp stage if not exists {profiler.stage} FILE_FORMAT = (RECORD_DELIMITER = NONE FIELD_DELIMITER = NONE )"
).collect()
self.profiler._register_modules()
self.profiler._set_targeted_stage()
self.profiler._set_active_profiler()
self.profiler.query_history = self.query_history()

def show_profiles(self) -> str:
"""
Return and show the profiles of last executed stored procedure.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last executed -> the last executed


Note:
This function must be called right after the execution of stored procedure you want to profile.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stored procedure -> the stored procedure

"""
if self.profiler is not None and isinstance(self.profiler, Profiler):
return self.profiler.show_profiles()
else:
raise ValueError(
"profiler is not set, use session.register_profiler or profiler context manager"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

profiler is not set, use session.register_profiler or profiler context manager
->
Profiler is not set. Use session.register_profiler or profiler context manager.

)

def dump_profiles(self, dst_file: str):
"""
Write the profiles of last executed stored procedure to given file.

Note:
This function must be called right after the execution of stored procedure you want to profile.

Args:
dst_file: String of file name that you want to store the profiles.
"""
if self.profiler is not None and isinstance(self.profiler, Profiler):
self.profiler.dump_profiles(dst_file=dst_file)
else:
raise ValueError(
"profiler is not set, use session.register_profiler or profiler context manager"
)

def register_profiler_modules(self, stored_procedures: List[str]):
"""
Register stored procedures to generate profiles for them.

Note:
Registered nodules will be overwritten by this function,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nodules -> modules

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem: Comma splice.
Resolution: Change comma to period and capitalize the first word of the second sentence.

In other words, change it to this:
Registered modules will be overwritten by this function. Using this function with an empty string will remove registered modules.

use this function with an empty string will remove registered modules.
Args:
stored_procedures: List of names of stored procedures.
"""
if self.profiler is not None and isinstance(self.profiler, Profiler):
self.profiler.register_profiler_modules(stored_procedures)
else:
sql_statement = f"alter session set python_profiler_modules='{','.join(stored_procedures)}'"
self.sql(sql_statement).collect()

def query_history(self, include_describe: bool = False) -> QueryHistory:
"""Create an instance of :class:`QueryHistory` as a context manager to record queries that are pushed down to the Snowflake database.

Expand Down
33 changes: 33 additions & 0 deletions tests/integ/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,39 @@ def session(
session.close()


@pytest.fixture(scope="function")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need this. I don't see any profiler specific actions here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need the session to be function level, otherwise the test will affect each other because one session will only have one profiler

def profiler_session(
db_parameters,
resources_path,
sql_simplifier_enabled,
local_testing_mode,
cte_optimization_enabled,
):
rule1 = f"rule1{Utils.random_alphanumeric_str(10)}"
rule2 = f"rule2{Utils.random_alphanumeric_str(10)}"
key1 = f"key1{Utils.random_alphanumeric_str(10)}"
key2 = f"key2{Utils.random_alphanumeric_str(10)}"
integration1 = f"integration1{Utils.random_alphanumeric_str(10)}"
integration2 = f"integration2{Utils.random_alphanumeric_str(10)}"
session = (
Session.builder.configs(db_parameters)
.config("local_testing", local_testing_mode)
.create()
)
session.sql_simplifier_enabled = sql_simplifier_enabled
session._cte_optimization_enabled = cte_optimization_enabled
if os.getenv("GITHUB_ACTIONS") == "true" and not local_testing_mode:
set_up_external_access_integration_resources(
session, rule1, rule2, key1, key2, integration1, integration2
)
yield session
if os.getenv("GITHUB_ACTIONS") == "true" and not local_testing_mode:
clean_up_external_access_integration_resources(
session, rule1, rule2, key1, key2, integration1, integration2
)
session.close()


@pytest.fixture(scope="function")
def temp_schema(connection, session, local_testing_mode) -> None:
"""Set up and tear down a temp schema for cross-schema test.
Expand Down
Loading
Loading