diff --git a/llama-index-core/llama_index/core/instrumentation/dispatcher.py b/llama-index-core/llama_index/core/instrumentation/dispatcher.py index 8e892e17162aa..4f5683430402f 100644 --- a/llama-index-core/llama_index/core/instrumentation/dispatcher.py +++ b/llama-index-core/llama_index/core/instrumentation/dispatcher.py @@ -1,3 +1,5 @@ +import asyncio +from functools import partial from contextlib import contextmanager from contextvars import ContextVar, Token from typing import Any, Callable, Generator, List, Optional, Dict, Protocol @@ -261,20 +263,53 @@ def wrapper(func: Callable, instance: Any, args: list, kwargs: dict) -> Any: parent_id=parent_id, tags=tags, ) + + def handle_future_result(future, span_id, bound_args, instance): + try: + result = future.result() + self.span_exit( + id_=span_id, + bound_args=bound_args, + instance=instance, + result=result, + ) + return result + except BaseException as e: + self.event(SpanDropEvent(span_id=span_id, err_str=str(e))) + self.span_drop( + id_=span_id, bound_args=bound_args, instance=instance, err=e + ) + raise + finally: + active_span_id.reset(token) + try: result = func(*args, **kwargs) + if isinstance(result, asyncio.Future): + # If the result is a Future, wrap it + new_future = asyncio.ensure_future(result) + new_future.add_done_callback( + partial( + handle_future_result, + span_id=id_, + bound_args=bound_args, + instance=instance, + ) + ) + return new_future + else: + # For non-Future results, proceed as before + self.span_exit( + id_=id_, bound_args=bound_args, instance=instance, result=result + ) + return result except BaseException as e: self.event(SpanDropEvent(span_id=id_, err_str=str(e))) self.span_drop(id_=id_, bound_args=bound_args, instance=instance, err=e) raise - else: - self.span_exit( - id_=id_, bound_args=bound_args, instance=instance, result=result - ) - return result finally: - # clean up - active_span_id.reset(token) + if not isinstance(result, asyncio.Future): + active_span_id.reset(token) @wrapt.decorator async def async_wrapper(