Skip to content

Commit

Permalink
SNOW-1473726: Dataframe tracing should record exceptions during actio…
Browse files Browse the repository at this point in the history
…n functions (#1755)
  • Loading branch information
sfc-gh-yuwang authored Jun 26, 2024
1 parent c96417b commit 7bf34e1
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 6 deletions.
9 changes: 3 additions & 6 deletions src/snowflake/snowpark/_internal/open_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ def open_telemetry_context_manager(func, dataframe):
logger.warning(f"Error when acquiring span attributes. {e}")
finally:
yield

else:
yield

Expand Down Expand Up @@ -101,11 +100,9 @@ def open_telemetry_udf_context_manager(
except Exception as e:
logger.warning(f"Error when acquiring span attributes. {e}")
finally:
try:
yield
except Exception as e:
cur_span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
raise e
yield
else:
yield


def decorator_count(func):
Expand Down
32 changes: 32 additions & 0 deletions tests/integ/test_open_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter

import snowflake
from snowflake.snowpark.exceptions import SnowparkSQLException
from snowflake.snowpark.functions import sproc, udaf, udf, udtf
from snowflake.snowpark.types import (
BinaryType,
Expand Down Expand Up @@ -54,6 +55,37 @@ def dict_exporter():
dict_exporter.shutdown()


def test_catch_error_during_action_function(session, dict_exporter):
df = session.sql("select 1/0")
with pytest.raises(SnowparkSQLException, match="Division by zero"):
df.collect()
spans = spans_to_dict(dict_exporter.get_finished_spans())
assert "collect" in spans
span = spans["collect"]
assert span.status.status_code == trace.status.StatusCode.ERROR
assert "Division by zero" in span.status.description
dict_exporter.clear()


def test_catch_error_during_registration_function(session, dict_exporter):
with pytest.raises(ValueError, match="does not exist"):
session.udf.register_from_file(
"empty file",
"mod5",
name="mod5_function",
return_type=IntegerType(),
input_types=[IntegerType()],
replace=True,
immutable=True,
)
spans = spans_to_dict(dict_exporter.get_finished_spans())
span = spans["register_from_file"]
assert span.status.status_code == trace.status.StatusCode.ERROR
assert "does not exist" in span.status.description

dict_exporter.clear()


@pytest.mark.skipif(
IS_IN_STORED_PROC,
reason="Cannot create session in SP",
Expand Down
24 changes: 24 additions & 0 deletions tests/unit/test_open_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,30 @@ def dict_exporter():
dict_exporter.shutdown()


def test_without_open_telemetry(monkeypatch, dict_exporter):
from snowflake.snowpark._internal import open_telemetry

monkeypatch.setattr(open_telemetry, "open_telemetry_found", False)
mock_connection = mock.create_autospec(ServerConnection)
mock_connection._conn = mock.MagicMock()
session = snowflake.snowpark.session.Session(mock_connection)
_add_session(session)
session._conn._telemetry_client = mock.MagicMock()
session.create_dataframe([1, 2, 3, 4]).to_df("a").collect()

spans = spans_to_dict(dict_exporter.get_finished_spans())
assert len(spans) == 0
dict_exporter.clear()

@udf(name="minus", session=session)
def minus_udf(x: int, y: int) -> int:
return x - y

spans = spans_to_dict(dict_exporter.get_finished_spans())
assert len(spans) == 0
dict_exporter.clear()


def test_register_udaf_from_file(dict_exporter):
test_file = os.path.normpath(
os.path.join(
Expand Down

0 comments on commit 7bf34e1

Please sign in to comment.