diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py index d21136c03d7d2..ce59102c39085 100644 --- a/vllm/engine/multiprocessing/client.py +++ b/vllm/engine/multiprocessing/client.py @@ -1,11 +1,9 @@ import asyncio -import copy import pickle from contextlib import contextmanager, suppress from typing import (Any, AsyncGenerator, Dict, Iterator, List, Mapping, Optional, Union, cast, overload) -import cloudpickle import psutil import zmq import zmq.asyncio @@ -19,8 +17,6 @@ from vllm.engine.arg_utils import AsyncEngineArgs # yapf conflicts with isort for this block # yapf: disable -from vllm.engine.async_llm_engine import ( - build_guided_decoding_logits_processor_async) from vllm.engine.multiprocessing import (ENGINE_DEAD_ERROR, IPC_DATA_EXT, IPC_HEALTH_EXT, IPC_INPUT_EXT, IPC_OUTPUT_EXT, RPC_REQUEST_T, @@ -577,38 +573,12 @@ async def _process_request( if self._errored_with is not None: raise ENGINE_DEAD_ERROR(self._errored_with) - # Constructing guided decoding logits processors is expensive, so we do - # it here to avoid contending with cpu resources and the GIL on the - # backend process. - if isinstance(params, SamplingParams) and \ - params.guided_decoding is not None: - params = await \ - build_guided_decoding_logits_processor_async( - sampling_params=params, - tokenizer=await self.get_tokenizer(lora_request), - default_guided_backend=(self.decoding_config.guided_decoding_backend - if self.decoding_config - else DecodingConfig.guided_decoding_backend), - model_config=self.model_config - ) - # 1) Create output queue for this requests. queue: asyncio.Queue[Union[RequestOutput, BaseException]] = asyncio.Queue() self.output_queues[request_id] = queue try: - # 2) Detach logits processors so that they can be pickled - # separately (may require cloudpickle which is slower) - if isinstance(params, SamplingParams) and params.logits_processors: - # Defensive shallow copy - params = copy.copy(params) - logits_processors = params.logits_processors - params.logits_processors = None - lp_bytes = cloudpickle.dumps(logits_processors) - else: - lp_bytes = None - request_bytes = pickle.dumps( RPCProcessRequest( prompt=prompt, @@ -620,12 +590,11 @@ async def _process_request( priority=priority, )) - # 3) Send the RPCGenerateRequest to the MQLLMEngine. - parts = (request_bytes, - lp_bytes) if lp_bytes else (request_bytes, ) - await self.input_socket.send_multipart(parts, copy=False) + # 2) Send the RPCGenerateRequest to the MQLLMEngine. + await self.input_socket.send_multipart((request_bytes, ), + copy=False) - # 4) Stream the RequestOutputs from the output queue. Note + # 3) Stream the RequestOutputs from the output queue. Note # that the output_loop pushes RequestOutput objects to this # queue after pulling them from the zmq socket. finished = False diff --git a/vllm/engine/multiprocessing/engine.py b/vllm/engine/multiprocessing/engine.py index 49a90b321dac4..8caf63ce42fd7 100644 --- a/vllm/engine/multiprocessing/engine.py +++ b/vllm/engine/multiprocessing/engine.py @@ -3,10 +3,9 @@ from contextlib import contextmanager from typing import Iterator, List, Optional, Union -import cloudpickle import zmq -from vllm import AsyncEngineArgs, SamplingParams +from vllm import AsyncEngineArgs from vllm.engine.llm_engine import LLMEngine # yapf conflicts with isort for this block # yapf: disable @@ -221,11 +220,6 @@ def handle_new_input(self): request = pickle.loads(frames[0].buffer) if isinstance(request, RPCProcessRequest): - if len(frames) > 1: - # Use cloudpickle for logits processors - assert isinstance(request.params, SamplingParams) - lprocs = cloudpickle.loads(frames[1].buffer) - request.params.logits_processors = lprocs self._handle_process_request(request) elif isinstance(request, RPCAbortRequest): self._handle_abort_request(request)