Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1527717: Add profiler snowpark API #2252

Merged
merged 76 commits into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from 69 commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
0c1b6d9
add profiler
sfc-gh-yuwang Sep 7, 2024
9d911df
profiler finish
sfc-gh-yuwang Sep 9, 2024
968ac2c
t
sfc-gh-yuwang Sep 9, 2024
71967c6
t
sfc-gh-yuwang Sep 9, 2024
3222459
add test
sfc-gh-yuwang Sep 10, 2024
e689d54
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 10, 2024
83b22a0
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 11, 2024
e1d314a
address comment
sfc-gh-yuwang Sep 11, 2024
bf71cc2
fix get last query id
sfc-gh-yuwang Sep 11, 2024
b80f113
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 16, 2024
3320814
Update session.py
sfc-gh-yuwang Sep 16, 2024
286cc31
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 16, 2024
791cf2a
add docstring
sfc-gh-yuwang Sep 17, 2024
b458a85
add regx for anonymous procedure
sfc-gh-yuwang Sep 17, 2024
ba726ff
add docstring
sfc-gh-yuwang Sep 17, 2024
242e4de
skip in localtesting
sfc-gh-yuwang Sep 17, 2024
bf4d169
coverage test
sfc-gh-yuwang Sep 17, 2024
e788125
fix lint
sfc-gh-yuwang Sep 17, 2024
c45ca1e
lint fix
sfc-gh-yuwang Sep 17, 2024
32a448c
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 17, 2024
72e0162
fix test
sfc-gh-yuwang Sep 18, 2024
1d992f2
fix test
sfc-gh-yuwang Sep 18, 2024
60c4aba
fix test
sfc-gh-yuwang Sep 18, 2024
0657c2e
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 18, 2024
dcc6ad7
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 18, 2024
eda9559
add test
sfc-gh-yuwang Sep 18, 2024
d196ddd
add unit test
sfc-gh-yuwang Sep 18, 2024
6a7046d
fix test
sfc-gh-yuwang Sep 19, 2024
fcac5fd
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 19, 2024
b2c6513
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 19, 2024
a27e64c
make test robust
sfc-gh-yuwang Sep 19, 2024
896c9cd
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Sep 19, 2024
3e6a771
address comments
sfc-gh-yuwang Sep 20, 2024
0d75b84
fix test
sfc-gh-yuwang Sep 20, 2024
f9df3cd
address comments
sfc-gh-yuwang Sep 20, 2024
de0d5c0
align with doc
sfc-gh-yuwang Sep 27, 2024
8ee830e
fix test
sfc-gh-yuwang Sep 30, 2024
1e2d86c
fix doc
sfc-gh-yuwang Sep 30, 2024
4629d89
fix test
sfc-gh-yuwang Sep 30, 2024
43c0f5a
fix test
sfc-gh-yuwang Sep 30, 2024
76f5eaa
rename to stored procedure profiler
sfc-gh-yuwang Oct 1, 2024
1a626f1
rename to stored procedure profiler
sfc-gh-yuwang Oct 1, 2024
f5c246a
multi thread compatiable
sfc-gh-yuwang Oct 1, 2024
251126f
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Oct 2, 2024
e67cca4
multi thread support
sfc-gh-yuwang Oct 2, 2024
3c30f9a
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Oct 2, 2024
233a9f4
remove show and dump
sfc-gh-yuwang Oct 2, 2024
aa90a19
fix docstring
sfc-gh-yuwang Oct 2, 2024
bb4220d
address comments
sfc-gh-yuwang Oct 2, 2024
d6ba864
make prifler thread safe
sfc-gh-yuwang Oct 2, 2024
6b265dc
destroy query history when not in use
sfc-gh-yuwang Oct 2, 2024
63908d1
address comments
sfc-gh-yuwang Oct 2, 2024
37fa336
address comments
sfc-gh-yuwang Oct 3, 2024
393e98e
address comment
sfc-gh-yuwang Oct 3, 2024
e538669
address comments
sfc-gh-yuwang Oct 3, 2024
468872d
address comments
sfc-gh-yuwang Oct 3, 2024
80f8635
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Oct 3, 2024
51da343
fix doc test
sfc-gh-yuwang Oct 3, 2024
60885bc
fixtest
sfc-gh-yuwang Oct 3, 2024
77b9457
add type check
sfc-gh-yuwang Oct 3, 2024
5d299df
fix type check
sfc-gh-yuwang Oct 3, 2024
b335b77
address comments
sfc-gh-yuwang Oct 3, 2024
1188594
change logic of finding stage
sfc-gh-yuwang Oct 3, 2024
157bc3d
remove bracket
sfc-gh-yuwang Oct 3, 2024
9fb57ac
use existing pattern
sfc-gh-yuwang Oct 3, 2024
773e69e
change test
sfc-gh-yuwang Oct 3, 2024
e406a56
Merge branch 'main' into SNOW-1527717
sfc-gh-yuwang Oct 3, 2024
292834c
thread safe change
sfc-gh-yuwang Oct 4, 2024
95cb15d
address comments
sfc-gh-yuwang Oct 4, 2024
766d77e
remove link
sfc-gh-yuwang Oct 4, 2024
776ba68
add prpr tag
sfc-gh-yuwang Oct 4, 2024
1324b1d
add prpr tag
sfc-gh-yuwang Oct 4, 2024
e89f6b9
type check
sfc-gh-yuwang Oct 4, 2024
375f11b
add test for coverage
sfc-gh-yuwang Oct 4, 2024
2906d0b
fix test
sfc-gh-yuwang Oct 4, 2024
dfe2de8
fix test
sfc-gh-yuwang Oct 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
- Added support for constructing `Series` and `DataFrame` objects with the lazy `Index` object as `data`, `index`, and `columns` arguments.
- Added support for constructing `Series` and `DataFrame` objects with `index` and `column` values not present in `DataFrame`/`Series` `data`.
- Added `thread_id` to `QueryRecord` to track the thread id submitting the query history.
-
- Added support for `Session.stored_procedure_profiler`.

#### Improvements

#### Bug Fixes
Expand Down
1 change: 1 addition & 0 deletions docs/source/snowpark/session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Snowpark Session
Session.get_packages
Session.get_session_stage
Session.query_history
Session.stored_procedure_profiler
Session.range
Session.remove_import
Session.remove_package
Expand Down
14 changes: 14 additions & 0 deletions src/snowflake/snowpark/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
from snowflake.snowpark.query_history import QueryHistory
from snowflake.snowpark.row import Row
from snowflake.snowpark.stored_procedure import StoredProcedureRegistration
from snowflake.snowpark.stored_procedure_profiler import StoredProcedureProfiler
from snowflake.snowpark.table import Table
from snowflake.snowpark.table_function import (
TableFunctionCall,
Expand Down Expand Up @@ -601,6 +602,8 @@ def __init__(
self._conf = self.RuntimeConfig(self, options or {})
self._runtime_version_from_requirement: str = None
self._temp_table_auto_cleaner: TempTableAutoCleaner = TempTableAutoCleaner(self)
self._sp_profiler = StoredProcedureProfiler(session=self)

_logger.info("Snowpark Session information: %s", self._session_info)

def __enter__(self):
Expand Down Expand Up @@ -3242,6 +3245,17 @@ def sproc(self) -> StoredProcedureRegistration:
"""
return self._sp_registration

@property
def stored_procedure_profiler(self) -> StoredProcedureProfiler:
"""
Returns a :class:`stored_procedure_profiler.StoredProcedureProfiler` object that you can use to profile stored procedures.
See details of how to use this object in :class:`stored_procedure_profiler.StoredProcedureProfiler`.

See more details about stored procedure profiler at:
https://docs.snowflake.com/LIMITEDACCESS/stored-procedures-python-profiler
"""
return self._sp_profiler

def _infer_is_return_table(
self, sproc_name: str, *args: Any, log_on_exception: bool = False
) -> bool:
Expand Down
131 changes: 131 additions & 0 deletions src/snowflake/snowpark/stored_procedure_profiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#
# Copyright (c) 2012-2024 Snowflake Computing Inc. All rights reserved.
#
import threading
from typing import List, Literal, Optional

import snowflake.snowpark
from snowflake.snowpark._internal.utils import (
SNOWFLAKE_ANONYMOUS_CALL_WITH_PATTERN,
parse_table_name,
)


class StoredProcedureProfiler:
"""
Set up profiler to receive profiles of stored procedures. This feature cannot be used in owner's right stored
procedure because owner's right stored procedure will not be able to set session-level parameters.

See more details about stored procedure profiler at:
https://docs.snowflake.com/LIMITEDACCESS/stored-procedures-python-profiler
"""

def __init__(
self,
session: "snowflake.snowpark.Session",
) -> None:
self._session = session
self._query_history = None
self._lock = threading.RLock()
self._active_profiler_number = 0

def register_modules(self, stored_procedures: Optional[List[str]] = None) -> None:
"""
Register stored procedures to generate profiles for them.

Args:
stored_procedures: List of names of stored procedures. Registered modules will be overwritten by this input.
Input None or an empty list will remove registered modules.
"""
sfc-gh-yuwang marked this conversation as resolved.
Show resolved Hide resolved
sp_string = ",".join(stored_procedures) if stored_procedures is not None else ""
sql_statement = f"alter session set python_profiler_modules='{sp_string}'"
self._session.sql(sql_statement)._internal_collect_with_tag_no_telemetry()

def set_target_stage(self, stage: str) -> None:
"""
Set targeted stage for profiler output.

Args:
stage: String of fully qualified name of targeted stage
"""
names = parse_table_name(stage)
if len(names) != 3:
raise ValueError(
f"stage name must be fully qualified name, got {stage} instead"
)
existing_stages = self._session.sql(
f"show stages like '{names[2]}' in schema {names[0]}.{names[1]}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If names[2] is doubled quoted, you may need to remove the double quote first.

)._internal_collect_with_tag_no_telemetry()
if len(existing_stages) == 0:
self._session.sql(
f"create temp stage if not exists {stage} FILE_FORMAT = (RECORD_DELIMITER = NONE FIELD_DELIMITER = NONE )"
)._internal_collect_with_tag_no_telemetry()
sql_statement = f'alter session set PYTHON_PROFILER_TARGET_STAGE ="{stage}"'
self._session.sql(sql_statement)._internal_collect_with_tag_no_telemetry()

def set_active_profiler(
self, active_profiler_type: Literal["LINE", "MEMORY"] = "LINE"
) -> None:
"""
Set active profiler.

Args:
active_profiler_type: String that represent active_profiler, must be either 'LINE' or 'MEMORY'
(case-sensitive). Active profiler is 'LINE' by default.

"""
with self._lock:
self._active_profiler_number += 1
if active_profiler_type not in ["LINE", "MEMORY"]:
raise ValueError(
f"active_profiler expect 'LINE', 'MEMORY', got {active_profiler_type} instead"
)
sql_statement = (
f"alter session set ACTIVE_PYTHON_PROFILER = '{active_profiler_type}'"
)
self._session.sql(sql_statement)._internal_collect_with_tag_no_telemetry()
with self._lock:
if self._query_history is None:
self._query_history = self._session.query_history(
include_thread_id=True
)

def disable(self) -> None:
"""
Disable profiler.
"""
with self._lock:
self._active_profiler_number -= 1
if self._active_profiler_number == 0:
self._session._conn.remove_query_listener(self._query_history) # type: ignore
self._query_history = None
sql_statement = "alter session set ACTIVE_PYTHON_PROFILER = ''"
self._session.sql(sql_statement)._internal_collect_with_tag_no_telemetry()

@staticmethod
def _is_sp_call(query: str) -> bool:
query = query.upper().strip(" ")
return SNOWFLAKE_ANONYMOUS_CALL_WITH_PATTERN.match(
query
) is not None or query.startswith("CALL")

def _get_last_query_id(self) -> Optional[str]:
current_thread = threading.get_ident()
for query in self._query_history.queries[::-1]: # type: ignore
query_thread = getattr(query, "thread_id", None)
if query_thread == current_thread and self._is_sp_call(query.sql_text):
return query.query_id
return None

def get_output(self) -> str:
"""
Return the profiles of last executed stored procedure in current thread. If there is no previous
stored procedure call, an error will be raised.
Please call this function right after the stored procedure you want to profile to avoid any error.

"""
query_id = self._get_last_query_id()
if query_id is None:
raise ValueError("Last executed stored procedure does not exist")
sql = f"select snowflake.core.get_python_profiler_output('{query_id}')"
return self._session.sql(sql)._internal_collect_with_tag_no_telemetry()[0][0] # type: ignore
33 changes: 33 additions & 0 deletions tests/integ/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,39 @@ def session(
session.close()


@pytest.fixture(scope="function")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need this. I don't see any profiler specific actions here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need the session to be function level, otherwise the test will affect each other because one session will only have one profiler

def profiler_session(
db_parameters,
resources_path,
sql_simplifier_enabled,
local_testing_mode,
cte_optimization_enabled,
):
rule1 = f"rule1{Utils.random_alphanumeric_str(10)}"
rule2 = f"rule2{Utils.random_alphanumeric_str(10)}"
key1 = f"key1{Utils.random_alphanumeric_str(10)}"
key2 = f"key2{Utils.random_alphanumeric_str(10)}"
integration1 = f"integration1{Utils.random_alphanumeric_str(10)}"
integration2 = f"integration2{Utils.random_alphanumeric_str(10)}"
session = (
Session.builder.configs(db_parameters)
.config("local_testing", local_testing_mode)
.create()
)
session.sql_simplifier_enabled = sql_simplifier_enabled
session._cte_optimization_enabled = cte_optimization_enabled
if os.getenv("GITHUB_ACTIONS") == "true" and not local_testing_mode:
set_up_external_access_integration_resources(
session, rule1, rule2, key1, key2, integration1, integration2
)
yield session
if os.getenv("GITHUB_ACTIONS") == "true" and not local_testing_mode:
clean_up_external_access_integration_resources(
session, rule1, rule2, key1, key2, integration1, integration2
)
session.close()


@pytest.fixture(scope="function")
def temp_schema(connection, session, local_testing_mode) -> None:
"""Set up and tear down a temp schema for cross-schema test.
Expand Down
Loading
Loading