Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1341730:Emit spans for UDxF registrations #1648

Merged
merged 26 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#### Improvements

- Added open telemetry tracing on UDxF functions in snowpark.

#### New Features

- Added support for `to_boolean` function.
Expand Down
83 changes: 76 additions & 7 deletions src/snowflake/snowpark/_internal/open_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,20 @@
from typing import Tuple

logger = getLogger(__name__)
target_class = ["dataframe.py", "dataframe_writer.py"]
target_modules = [
"dataframe.py",
"dataframe_writer.py",
"udf.py",
"udtf.py",
"udaf.py",
"functions.py",
]
registration_modules = ["udf.py", "udtf.py", "udaf.py"]
# this parameter make sure no error when open telemetry is not installed
open_telemetry_found = True
try:
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode

except ImportError:
open_telemetry_found = False
Expand All @@ -30,11 +39,7 @@ def open_telemetry_context_manager(func, dataframe):
if open_telemetry_found:
class_name = func.__qualname__
name = func.__name__
tracer = (
trace.get_tracer(f"snow.snowpark.{class_name.split('.')[0].lower()}")
if "." in class_name
else class_name
)
tracer = trace.get_tracer(extract_tracer_name(class_name))
with tracer.start_as_current_span(name) as cur_span:
try:
if cur_span.is_recording():
Expand All @@ -56,6 +61,59 @@ def open_telemetry_context_manager(func, dataframe):
yield


@contextmanager
def open_telemetry_udf_context_manager(func, parameters):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why using parameters pattern rather than passing the individual parameters?

# trace when required package is installed
if open_telemetry_found:
class_name = func.__qualname__
name = func.__name__
tracer = trace.get_tracer(extract_tracer_name(class_name))
with tracer.start_as_current_span(name) as cur_span:
try:
# first try to get func if it is udf, then try to get handler if it is udtf/udaf, if still None, means it is
# loading from file
udf_func = (
parameters.get("func")
if parameters.get("func")
else parameters.get("handler")
)
# if udf_func is not None, meaning it is a udf function or udf handler class, get handler_name from it, otherwise find
# function name or handler name from parameter
handler_name = (
udf_func.__name__
if udf_func
else (
parameters.get("func_name")
if parameters.get("func_name")
else parameters.get("handler_name")
)
)
if cur_span.is_recording():
# store execution location in span
filename, lineno = context_manager_code_location(
inspect.stack(), func
)
cur_span.set_attribute("code.filepath", f"{filename}")
cur_span.set_attribute("code.lineno", lineno)
cur_span.set_attribute(
"snow.executable.name", parameters.get("name")
)
cur_span.set_attribute("snow.executable.handler", handler_name)
cur_span.set_attribute(
"snow.executable.filepath", parameters.get("file_path")
)
except Exception as e:
logger.warning(f"Error when acquiring span attributes. {e}")
sfc-gh-yuwang marked this conversation as resolved.
Show resolved Hide resolved
finally:
try:
yield
except Exception as e:
cur_span.set_status(Status(StatusCode.ERROR, str(e)))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use trace.Status instead and avoid import as well. It is clearer which status you are referring to.

raise e
else:
yield


def decorator_count(func):
count = 0
current_func = func
Expand All @@ -66,12 +124,15 @@ def decorator_count(func):


def context_manager_code_location(frame_info, func) -> Tuple[str, int]:
# we know what function we are tracking, with this information, we can locate where target function is called
decorator_number = decorator_count(func)
target_index = -1
for i, frame in enumerate(frame_info):
file_name = os.path.basename(frame.filename)
if file_name in target_class:
if file_name in target_modules:
target_index = i + decorator_number + 1
if file_name in registration_modules:
continue
break
frame = frame_info[target_index]
return frame.filename, frame.lineno
Expand All @@ -87,3 +148,11 @@ def build_method_chain(api_calls, name) -> str:
method_chain = f"{method_chain}{method_name}()."
method_chain = f"{method_chain}{name.split('.')[-1]}()"
return method_chain


def extract_tracer_name(class_name):
return (
f"snow.snowpark.{class_name.split('.')[0].lower()}"
if "." in class_name
else class_name
)
141 changes: 76 additions & 65 deletions src/snowflake/snowpark/udaf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
from snowflake.connector import ProgrammingError
from snowflake.snowpark._internal.analyzer.expression import Expression, SnowflakeUDF
from snowflake.snowpark._internal.error_message import SnowparkClientExceptionMessages
from snowflake.snowpark._internal.open_telemetry import (
open_telemetry_udf_context_manager,
)
from snowflake.snowpark._internal.type_utils import ColumnOrName, convert_sp_to_sf_type
from snowflake.snowpark._internal.udf_utils import (
UDFColumn,
Expand Down Expand Up @@ -413,43 +416,45 @@ def register(
- :func:`~snowflake.snowpark.functions.udaf`
- :meth:`register_from_file`
"""
if not isinstance(handler, type):
raise TypeError(
f"Invalid handler: expecting a class type, but get {type(handler)}"
)

check_register_args(
TempObjectType.AGGREGATE_FUNCTION,
name,
is_permanent,
stage_location,
parallel,
)
parameters = {"handler": handler, "name": name}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use named arguments, I think it is better than creating a new dict and search in it.

with open_telemetry_udf_context_manager(self.register, parameters):
if not isinstance(handler, type):
raise TypeError(
f"Invalid handler: expecting a class type, but get {type(handler)}"
)

native_app_params = kwargs.get("native_app_params", None)
check_register_args(
TempObjectType.AGGREGATE_FUNCTION,
name,
is_permanent,
stage_location,
parallel,
)

# register udaf
return self._do_register_udaf(
handler,
return_type,
input_types,
name,
stage_location,
imports,
packages,
replace,
if_not_exists,
parallel,
statement_params=statement_params,
source_code_display=source_code_display,
api_call_source="UDAFRegistration.register",
is_permanent=is_permanent,
immutable=immutable,
external_access_integrations=external_access_integrations,
secrets=secrets,
comment=comment,
native_app_params=native_app_params,
)
native_app_params = kwargs.get("native_app_params", None)

# register udaf
return self._do_register_udaf(
handler,
return_type,
input_types,
name,
stage_location,
imports,
packages,
replace,
if_not_exists,
parallel,
statement_params=statement_params,
source_code_display=source_code_display,
api_call_source="UDAFRegistration.register",
is_permanent=is_permanent,
immutable=immutable,
external_access_integrations=external_access_integrations,
secrets=secrets,
comment=comment,
native_app_params=native_app_params,
)

def register_from_file(
self,
Expand Down Expand Up @@ -567,37 +572,43 @@ def register_from_file(
- :func:`~snowflake.snowpark.functions.udaf`
- :meth:`register`
"""
file_path = process_file_path(file_path)
check_register_args(
TempObjectType.AGGREGATE_FUNCTION,
name,
is_permanent,
stage_location,
parallel,
)
parameters = {
"file_path": file_path,
"handler_name": handler_name,
"name": name,
}
with open_telemetry_udf_context_manager(self.register_from_file, parameters):
file_path = process_file_path(file_path)
check_register_args(
TempObjectType.AGGREGATE_FUNCTION,
name,
is_permanent,
stage_location,
parallel,
)

# register udaf
return self._do_register_udaf(
(file_path, handler_name),
return_type,
input_types,
name,
stage_location,
imports,
packages,
replace,
if_not_exists,
parallel,
external_access_integrations=external_access_integrations,
secrets=secrets,
statement_params=statement_params,
source_code_display=source_code_display,
api_call_source="UDAFRegistration.register_from_file",
skip_upload_on_content_match=skip_upload_on_content_match,
is_permanent=is_permanent,
immutable=immutable,
comment=comment,
)
# register udaf
return self._do_register_udaf(
(file_path, handler_name),
return_type,
input_types,
name,
stage_location,
imports,
packages,
replace,
if_not_exists,
parallel,
external_access_integrations=external_access_integrations,
secrets=secrets,
statement_params=statement_params,
source_code_display=source_code_display,
api_call_source="UDAFRegistration.register_from_file",
skip_upload_on_content_match=skip_upload_on_content_match,
is_permanent=is_permanent,
immutable=immutable,
comment=comment,
)

def _do_register_udaf(
self,
Expand Down
Loading
Loading