From 3015a4be674db5e498f895c2fb2ee08a8b0a1299 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Sat, 10 Aug 2024 20:38:28 +0000 Subject: [PATCH] async metrics --- examples/openai_completion_client.py | 2 +- vllm/engine/async_llm_engine.py | 14 ++++++++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/examples/openai_completion_client.py b/examples/openai_completion_client.py index 58519f978d340..7548308c6d7a9 100644 --- a/examples/openai_completion_client.py +++ b/examples/openai_completion_client.py @@ -2,7 +2,7 @@ # Modify OpenAI's API key and API base to use vLLM's API server. openai_api_key = "EMPTY" -openai_api_base = "http://localhost:8000/v1" +openai_api_base = "http://localhost:8001/v1" client = OpenAI( # defaults to os.environ.get("OPENAI_API_KEY") diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index 809eb6de9f173..a7bec1781133d 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -33,6 +33,7 @@ logger = init_logger(__name__) ENGINE_ITERATION_TIMEOUT_S = envs.VLLM_ENGINE_ITERATION_TIMEOUT_S +_running_tasks = set() class AsyncEngineDeadError(RuntimeError): pass @@ -251,6 +252,10 @@ def has_new_requests(self): class _AsyncLLMEngine(LLMEngine): """Extension of LLMEngine to add async methods.""" + async def do_log_stats_async(self, scheduler_outputs, model_output): + self.do_log_stats(scheduler_outputs, model_output) + + async def step_async( self, virtual_engine: int ) -> List[Union[RequestOutput, EmbeddingRequestOutput]]: @@ -289,7 +294,11 @@ async def step_async( scheduler_outputs.ignored_seq_groups, seq_group_metadata_list) # Log stats. - self.do_log_stats(scheduler_outputs, output) + log_task = asyncio.create_task(self.do_log_stats_async( + scheduler_outputs, output)) + _running_tasks.add(log_task) + log_task.add_done_callback(_running_tasks.discard) + # self.do_log_stats(scheduler_outputs, output) # Tracing self.do_tracing(scheduler_outputs) @@ -1068,7 +1077,8 @@ async def do_log_stats( await self.engine.do_log_stats.remote( # type: ignore scheduler_outputs, model_output) else: - self.engine.do_log_stats() + self.engine.do_log_stats(scheduler_outputs, + model_output) async def check_health(self) -> None: """Raises an error if engine is unhealthy."""