From 8957fafb21b2bddf15071f769acf27556f4feaf6 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Sat, 10 Aug 2024 21:36:36 +0000 Subject: [PATCH] example --- examples/openai_completion_client.py | 2 +- vllm/engine/async_llm_engine.py | 37 +++++++++++++++++++++++----- vllm/engine/llm_engine.py | 1 + 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/examples/openai_completion_client.py b/examples/openai_completion_client.py index 7548308c6d7a9..58519f978d340 100644 --- a/examples/openai_completion_client.py +++ b/examples/openai_completion_client.py @@ -2,7 +2,7 @@ # Modify OpenAI's API key and API base to use vLLM's API server. openai_api_key = "EMPTY" -openai_api_base = "http://localhost:8001/v1" +openai_api_base = "http://localhost:8000/v1" client = OpenAI( # defaults to os.environ.get("OPENAI_API_KEY") diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index a7bec1781133d..79124ff9fa410 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -1,5 +1,8 @@ import asyncio import time +import zmq +import zmq.asyncio +import pickle from functools import partial from typing import (AsyncGenerator, Callable, Dict, Iterable, List, Mapping, Optional, Set, Tuple, Type, Union) @@ -252,9 +255,24 @@ def has_new_requests(self): class _AsyncLLMEngine(LLMEngine): """Extension of LLMEngine to add async methods.""" - async def do_log_stats_async(self, scheduler_outputs, model_output): - self.do_log_stats(scheduler_outputs, model_output) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.logger_ctx = zmq.asyncio.Context() + self.to_logger = self.logger_ctx.socket(zmq.constants.PUSH) + self.to_logger.bind("inproc://doesitwork") + + self.from_engine = self.logger_ctx.socket(zmq.constants.PULL) + self.from_engine.connect("inproc://doesitwork") + + self.logging_task = asyncio.create_task(self.run_logging_loop()) + + + async def run_logging_loop(self): + + while True: + data = await self.from_engine.recv_pyobj() + self.do_log_stats(**data) async def step_async( self, virtual_engine: int @@ -294,15 +312,22 @@ async def step_async( scheduler_outputs.ignored_seq_groups, seq_group_metadata_list) # Log stats. - log_task = asyncio.create_task(self.do_log_stats_async( - scheduler_outputs, output)) - _running_tasks.add(log_task) - log_task.add_done_callback(_running_tasks.discard) + # log_task = asyncio.create_task(self.do_log_stats_async( + # scheduler_outputs, output)) + # _running_tasks.add(log_task) + # log_task.add_done_callback(_running_tasks.discard) # self.do_log_stats(scheduler_outputs, output) + await self.to_logger.send_pyobj( + { + "scheduler_outputs": scheduler_outputs, + "model_output": output + } + ) # Tracing self.do_tracing(scheduler_outputs) + return request_outputs async def stop_remote_worker_execution_loop_async(self) -> None: diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 39bb1f9c274fa..ff9061dcc1dd5 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -363,6 +363,7 @@ def __init__( ), )) + def _initialize_kv_caches(self) -> None: """Initialize the KV cache in the worker(s).