Skip to content

Commit

Permalink
Fix instrumentation for workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
logan-markewich committed Sep 29, 2024
1 parent feabea0 commit 34d958a
Showing 1 changed file with 42 additions and 7 deletions.
49 changes: 42 additions & 7 deletions llama-index-core/llama_index/core/instrumentation/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio
from functools import partial
from contextlib import contextmanager
from contextvars import ContextVar, Token
from typing import Any, Callable, Generator, List, Optional, Dict, Protocol
Expand Down Expand Up @@ -261,20 +263,53 @@ def wrapper(func: Callable, instance: Any, args: list, kwargs: dict) -> Any:
parent_id=parent_id,
tags=tags,
)

def handle_future_result(future, span_id, bound_args, instance):
try:
result = future.result()
self.span_exit(
id_=span_id,
bound_args=bound_args,
instance=instance,
result=result,
)
return result
except BaseException as e:
self.event(SpanDropEvent(span_id=span_id, err_str=str(e)))
self.span_drop(
id_=span_id, bound_args=bound_args, instance=instance, err=e
)
raise
finally:
active_span_id.reset(token)

try:
result = func(*args, **kwargs)
if isinstance(result, asyncio.Future):
# If the result is a Future, wrap it
new_future = asyncio.ensure_future(result)
new_future.add_done_callback(
partial(
handle_future_result,
span_id=id_,
bound_args=bound_args,
instance=instance,
)
)
return new_future
else:
# For non-Future results, proceed as before
self.span_exit(
id_=id_, bound_args=bound_args, instance=instance, result=result
)
return result
except BaseException as e:
self.event(SpanDropEvent(span_id=id_, err_str=str(e)))
self.span_drop(id_=id_, bound_args=bound_args, instance=instance, err=e)
raise
else:
self.span_exit(
id_=id_, bound_args=bound_args, instance=instance, result=result
)
return result
finally:
# clean up
active_span_id.reset(token)
if not isinstance(result, asyncio.Future):
active_span_id.reset(token)

@wrapt.decorator
async def async_wrapper(
Expand Down

0 comments on commit 34d958a

Please sign in to comment.