diff --git a/Dockerfile b/Dockerfile index 0944050f7dfca..6226569e9d3b4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -163,7 +163,7 @@ RUN PYTHON_VERSION_STR=$(echo ${PYTHON_VERSION} | sed 's/\.//g') && \ RUN echo 'tzdata tzdata/Areas select America' | debconf-set-selections \ && echo 'tzdata tzdata/Zones/America select Los_Angeles' | debconf-set-selections \ && apt-get update -y \ - && apt-get install -y ccache software-properties-common git curl sudo vim python3-pip \ + && apt-get install -y ccache software-properties-common git curl wget sudo vim python3-pip \ && apt-get install -y ffmpeg libsm6 libxext6 libgl1 \ && add-apt-repository ppa:deadsnakes/ppa \ && apt-get update -y \ @@ -240,9 +240,9 @@ FROM vllm-base AS vllm-openai # install additional dependencies for openai api server RUN --mount=type=cache,target=/root/.cache/pip \ if [ "$TARGETPLATFORM" = "linux/arm64" ]; then \ - pip install accelerate hf_transfer 'modelscope!=1.15.0' 'bitsandbytes>=0.42.0' 'timm==0.9.10'; \ + pip install accelerate hf_transfer 'modelscope!=1.15.0' 'bitsandbytes>=0.42.0' 'timm==0.9.10' boto3 runai-model-streamer runai-model-streamer[s3]; \ else \ - pip install accelerate hf_transfer 'modelscope!=1.15.0' 'bitsandbytes>=0.45.0' 'timm==0.9.10'; \ + pip install accelerate hf_transfer 'modelscope!=1.15.0' 'bitsandbytes>=0.45.0' 'timm==0.9.10' boto3 runai-model-streamer runai-model-streamer[s3]; \ fi ENV VLLM_USAGE_SOURCE production-docker-image diff --git a/docs/source/getting_started/debugging.rst b/docs/source/getting_started/debugging.rst index 7f36d65a227f0..b123960533816 100644 --- a/docs/source/getting_started/debugging.rst +++ b/docs/source/getting_started/debugging.rst @@ -200,3 +200,4 @@ try this instead: Known Issues ---------------------------------------- - In ``v0.5.2``, ``v0.5.3``, and ``v0.5.3.post1``, there is a bug caused by `zmq `_ , which can occasionally cause vLLM to hang depending on the machine configuration. The solution is to upgrade to the latest version of ``vllm`` to include the `fix `_. +- To circumvent a NCCL `bug `__ , all vLLM processes will set an environment variable ``NCCL_CUMEM_ENABLE=0`` to disable NCCL's ``cuMem`` allocator. It does not affect performance but only gives memory benefits. When external processes want to set up a NCCL connection with vLLM's processes, they should also set this environment variable, otherwise, inconsistent environment setup will cause NCCL to hang or crash, as observed in `the RLHF integration `__ and the `discussion `__ . diff --git a/docs/source/index.rst b/docs/source/index.rst index fd741ea5e9766..d812885aafea9 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -88,6 +88,7 @@ Documentation serving/metrics serving/integrations serving/tensorizer + serving/runai_model_streamer .. toctree:: :maxdepth: 1 diff --git a/docs/source/serving/runai_model_streamer.rst b/docs/source/serving/runai_model_streamer.rst new file mode 100644 index 0000000000000..459eb8677fb95 --- /dev/null +++ b/docs/source/serving/runai_model_streamer.rst @@ -0,0 +1,53 @@ +.. _runai_model_streamer: + +Loading Models with Run:ai Model Streamer +========================================= +Run:ai Model Streamer is a library to read tensors in concurrency, while streaming it to GPU memory. +Further reading can be found in `Run:ai Model Streamer Documentation `_. + +vLLM supports loading weights in Safetensors format using the Run:ai Model Streamer. +You first need to install vLLM RunAI optional dependency: + +.. code-block:: console + + $ pip3 install vllm[runai] + +To run it as an OpenAI-compatible server, add the `--load-format runai_streamer` flag: + +.. code-block:: console + + $ vllm serve /home/meta-llama/Llama-3.2-3B-Instruct --load-format runai_streamer + +To run model from AWS S3 object store run: + +.. code-block:: console + + $ vllm serve s3://core-llm/Llama-3-8b --load-format runai_streamer + + +To run model from a S3 compatible object store run: + +.. code-block:: console + + $ RUNAI_STREAMER_S3_USE_VIRTUAL_ADDRESSING=0 AWS_EC2_METADATA_DISABLED=true AWS_ENDPOINT_URL=https://storage.googleapis.com vllm serve s3://core-llm/Llama-3-8b --load-format runai_streamer + +Tunable parameters +------------------ +You can tune parameters using `--model-loader-extra-config`: + +You can tune `concurrency` that controls the level of concurrency and number of OS threads reading tensors from the file to the CPU buffer. +For reading from S3, it will be the number of client instances the host is opening to the S3 server. + + .. code-block:: console + + $ vllm serve /home/meta-llama/Llama-3.2-3B-Instruct --load-format runai_streamer --model-loader-extra-config '{"concurrency":16}' + +You can controls the size of the CPU Memory buffer to which tensors are read from the file, and limit this size. +You can read further about CPU buffer memory limiting `here `_. + + .. code-block:: console + + $ vllm serve /home/meta-llama/Llama-3.2-3B-Instruct --load-format runai_streamer --model-loader-extra-config '{"memory_limit":5368709120}' + +.. note:: + For further instructions about tunable parameters and additional parameters configurable through environment variables, read the `Environment Variables Documentation `_. diff --git a/requirements-cuda.txt b/requirements-cuda.txt index 058ab7c1ee9df..8002fbd8ee5b9 100644 --- a/requirements-cuda.txt +++ b/requirements-cuda.txt @@ -2,7 +2,7 @@ -r requirements-common.txt # Dependencies for NVIDIA GPUs -ray >= 2.9 +ray[default] >= 2.9 nvidia-ml-py >= 12.560.30 # for pynvml package torch == 2.5.1 # These must be updated alongside torch diff --git a/setup.py b/setup.py index fcfaa207c176a..73407b64edf22 100644 --- a/setup.py +++ b/setup.py @@ -466,7 +466,7 @@ def get_vllm_version() -> str: version += f"{sep}empty" elif _is_cuda(): if envs.VLLM_USE_PRECOMPILED: - version += ".precompiled" + version += f"{sep}precompiled" else: cuda_version = str(get_nvcc_cuda_version()) if cuda_version != MAIN_CUDA_VERSION: @@ -630,6 +630,7 @@ def _read_requirements(filename: str) -> List[str]: ext_modules=ext_modules, extras_require={ "tensorizer": ["tensorizer>=2.9.0"], + "runai": ["runai-model-streamer", "runai-model-streamer-s3", "boto3"], "audio": ["librosa", "soundfile"], # Required for audio processing "video": ["decord"] # Required for video processing }, diff --git a/tests/entrypoints/openai/test_audio.py b/tests/entrypoints/openai/test_audio.py index 0a29d77e73abc..1116c0da1a6f0 100644 --- a/tests/entrypoints/openai/test_audio.py +++ b/tests/entrypoints/openai/test_audio.py @@ -74,6 +74,7 @@ async def test_single_chat_session_audio(client: openai.AsyncOpenAI, messages=messages, max_completion_tokens=10, logprobs=True, + temperature=0.0, top_logprobs=5) assert len(chat_completion.choices) == 1 @@ -130,6 +131,7 @@ async def test_single_chat_session_audio_base64encoded( messages=messages, max_completion_tokens=10, logprobs=True, + temperature=0.0, top_logprobs=5) assert len(chat_completion.choices) == 1 @@ -150,6 +152,7 @@ async def test_single_chat_session_audio_base64encoded( model=model_name, messages=messages, max_completion_tokens=10, + temperature=0.0, ) message = chat_completion.choices[0].message assert message.content is not None and len(message.content) >= 0 diff --git a/tests/entrypoints/openai/test_video.py b/tests/entrypoints/openai/test_video.py index 294b250362699..e73449e406739 100644 --- a/tests/entrypoints/openai/test_video.py +++ b/tests/entrypoints/openai/test_video.py @@ -82,6 +82,7 @@ async def test_single_chat_session_video(client: openai.AsyncOpenAI, messages=messages, max_completion_tokens=10, logprobs=True, + temperature=0.0, top_logprobs=5) assert len(chat_completion.choices) == 1 @@ -174,6 +175,7 @@ async def test_single_chat_session_video_base64encoded( messages=messages, max_completion_tokens=10, logprobs=True, + temperature=0.0, top_logprobs=5) assert len(chat_completion.choices) == 1 @@ -194,6 +196,7 @@ async def test_single_chat_session_video_base64encoded( model=model_name, messages=messages, max_completion_tokens=10, + temperature=0.0, ) message = chat_completion.choices[0].message assert message.content is not None and len(message.content) >= 0 diff --git a/tests/entrypoints/openai/test_vision.py b/tests/entrypoints/openai/test_vision.py index a0b6edd566561..5f070ba3b12e9 100644 --- a/tests/entrypoints/openai/test_vision.py +++ b/tests/entrypoints/openai/test_vision.py @@ -83,6 +83,7 @@ async def test_single_chat_session_image(client: openai.AsyncOpenAI, messages=messages, max_completion_tokens=10, logprobs=True, + temperature=0.0, top_logprobs=5) assert len(chat_completion.choices) == 1 @@ -175,6 +176,7 @@ async def test_single_chat_session_image_base64encoded( messages=messages, max_completion_tokens=10, logprobs=True, + temperature=0.0, top_logprobs=5) assert len(chat_completion.choices) == 1 @@ -195,6 +197,7 @@ async def test_single_chat_session_image_base64encoded( model=model_name, messages=messages, max_completion_tokens=10, + temperature=0.0, ) message = chat_completion.choices[0].message assert message.content is not None and len(message.content) >= 0 diff --git a/tests/runai_model_streamer/__init__.py b/tests/runai_model_streamer/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/tests/runai_model_streamer/test_runai_model_streamer_loader.py b/tests/runai_model_streamer/test_runai_model_streamer_loader.py new file mode 100644 index 0000000000000..c5722fbae5c8a --- /dev/null +++ b/tests/runai_model_streamer/test_runai_model_streamer_loader.py @@ -0,0 +1,31 @@ +from vllm import SamplingParams +from vllm.config import LoadConfig, LoadFormat +from vllm.model_executor.model_loader.loader import (RunaiModelStreamerLoader, + get_model_loader) + +test_model = "openai-community/gpt2" + +prompts = [ + "Hello, my name is", + "The president of the United States is", + "The capital of France is", + "The future of AI is", +] +# Create a sampling params object. +sampling_params = SamplingParams(temperature=0.8, top_p=0.95, seed=0) + + +def get_runai_model_loader(): + load_config = LoadConfig(load_format=LoadFormat.RUNAI_STREAMER) + return get_model_loader(load_config) + + +def test_get_model_loader_with_runai_flag(): + model_loader = get_runai_model_loader() + assert isinstance(model_loader, RunaiModelStreamerLoader) + + +def test_runai_model_loader_download_files(vllm_runner): + with vllm_runner(test_model, load_format=LoadFormat.RUNAI_STREAMER) as llm: + deserialized_outputs = llm.generate(prompts, sampling_params) + assert deserialized_outputs diff --git a/tests/runai_model_streamer/test_weight_utils.py b/tests/runai_model_streamer/test_weight_utils.py new file mode 100644 index 0000000000000..5c89bd78ad81d --- /dev/null +++ b/tests/runai_model_streamer/test_weight_utils.py @@ -0,0 +1,39 @@ +import glob +import tempfile + +import huggingface_hub.constants +import torch + +from vllm.model_executor.model_loader.weight_utils import ( + download_weights_from_hf, runai_safetensors_weights_iterator, + safetensors_weights_iterator) + + +def test_runai_model_loader(): + with tempfile.TemporaryDirectory() as tmpdir: + huggingface_hub.constants.HF_HUB_OFFLINE = False + download_weights_from_hf("openai-community/gpt2", + allow_patterns=["*.safetensors"], + cache_dir=tmpdir) + safetensors = glob.glob(f"{tmpdir}/**/*.safetensors", recursive=True) + assert len(safetensors) > 0 + + runai_model_streamer_tensors = {} + hf_safetensors_tensors = {} + + for name, tensor in runai_safetensors_weights_iterator(safetensors): + runai_model_streamer_tensors[name] = tensor + + for name, tensor in safetensors_weights_iterator(safetensors): + hf_safetensors_tensors[name] = tensor + + assert len(runai_model_streamer_tensors) == len(hf_safetensors_tensors) + + for name, runai_tensor in runai_model_streamer_tensors.items(): + assert runai_tensor.dtype == hf_safetensors_tensors[name].dtype + assert runai_tensor.shape == hf_safetensors_tensors[name].shape + assert torch.all(runai_tensor.eq(hf_safetensors_tensors[name])) + + +if __name__ == "__main__": + test_runai_model_loader() diff --git a/vllm/config.py b/vllm/config.py index 6badae24d9d7d..643698f8bbec3 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -29,6 +29,7 @@ get_hf_text_config, get_pooling_config, get_sentence_transformer_tokenizer_config, is_encoder_decoder, try_get_generation_config, uses_mrope) +from vllm.transformers_utils.utils import is_s3 from vllm.utils import (GiB_bytes, LayerBlockType, cuda_device_count_stateless, get_cpu_memory, print_warning_once, random_uuid, resolve_obj_by_qualname) @@ -256,6 +257,8 @@ def __init__(self, f"'Please instead use `--hf-overrides '{hf_override!r}'`") warnings.warn(DeprecationWarning(msg), stacklevel=2) + self.maybe_pull_model_tokenizer_for_s3(model, tokenizer) + # The tokenizer version is consistent with the model version by default. if tokenizer_revision is None: self.tokenizer_revision = revision @@ -357,6 +360,39 @@ def __init__(self, self._verify_cuda_graph() self._verify_bnb_config() + def maybe_pull_model_tokenizer_for_s3(self, model: str, + tokenizer: str) -> None: + """ + Pull the model config or tokenizer to a temporary + directory in case of S3. + + Args: + model: The model name or path. + tokenizer: The tokenizer name or path. + + """ + if is_s3(model) or is_s3(tokenizer): + try: + from vllm.transformers_utils.s3_utils import S3Model + except ImportError as err: + raise ImportError( + "Please install Run:ai optional dependency " + "to use the S3 capabilities. " + "You can install it with: pip install vllm[runai]" + ) from err + + if is_s3(model): + self.s3_model = S3Model() + self.s3_model.pull_files(model, allow_pattern=["*config.json"]) + self.model_weights = self.model + self.model = self.s3_model.dir + + if is_s3(tokenizer): + self.s3_tokenizer = S3Model() + self.s3_tokenizer.pull_files( + model, ignore_pattern=["*.pt", "*.safetensors", "*.bin"]) + self.tokenizer = self.s3_tokenizer.dir + def _init_multimodal_config( self, limit_mm_per_prompt: Optional[Mapping[str, int]] ) -> Optional["MultiModalConfig"]: @@ -1099,6 +1135,7 @@ class LoadFormat(str, enum.Enum): GGUF = "gguf" BITSANDBYTES = "bitsandbytes" MISTRAL = "mistral" + RUNAI_STREAMER = "runai_streamer" @dataclass diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index 912a8b2f54adb..7aa45b7958e26 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -316,6 +316,8 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: '* "tensorizer" will load the weights using tensorizer from ' 'CoreWeave. See the Tensorize vLLM Model script in the Examples ' 'section for more information.\n' + '* "runai_streamer" will load the Safetensors weights using Run:ai' + 'Model Streamer \n' '* "bitsandbytes" will load the weights using bitsandbytes ' 'quantization.\n') parser.add_argument( diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index f50e20cf70323..66a5089074ff5 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -1256,3 +1256,10 @@ async def stop_profile(self) -> None: self.engine.model_executor.stop_profile() else: self.engine.model_executor._run_workers("stop_profile") + + +# TODO(v1): Remove this class proxy when V1 goes default. +if envs.VLLM_USE_V1: + from vllm.v1.engine.async_llm import AsyncLLM + + AsyncLLMEngine = AsyncLLM # type: ignore diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 00e2d1a56f160..2e5b769a825ce 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -27,6 +27,7 @@ import vllm.envs as envs from vllm.config import ModelConfig from vllm.engine.arg_utils import AsyncEngineArgs +from vllm.engine.async_llm_engine import AsyncLLMEngine # type: ignore from vllm.engine.multiprocessing.client import MQLLMEngineClient from vllm.engine.multiprocessing.engine import run_mp_engine from vllm.engine.protocol import EngineClient @@ -66,11 +67,6 @@ is_valid_ipv6_address) from vllm.version import __version__ as VLLM_VERSION -if envs.VLLM_USE_V1: - from vllm.v1.engine.async_llm import AsyncLLMEngine # type: ignore -else: - from vllm.engine.async_llm_engine import AsyncLLMEngine # type: ignore - TIMEOUT_KEEP_ALIVE = 5 # seconds prometheus_multiproc_dir: tempfile.TemporaryDirectory diff --git a/vllm/entrypoints/openai/protocol.py b/vllm/entrypoints/openai/protocol.py index 1314de714215e..1d8b0d19f9516 100644 --- a/vllm/entrypoints/openai/protocol.py +++ b/vllm/entrypoints/openai/protocol.py @@ -46,7 +46,15 @@ class OpenAIBaseModel(BaseModel): @classmethod def __log_extra_fields__(cls, data): if isinstance(data, dict): - extra_fields = data.keys() - cls.model_fields.keys() + # Get all class field names and their potential aliases + field_names = set() + for field_name, field in cls.model_fields.items(): + field_names.add(field_name) + if hasattr(field, 'alias') and field.alias: + field_names.add(field.alias) + + # Compare against both field names and aliases + extra_fields = data.keys() - field_names if extra_fields: logger.warning( "The following fields were present in the request " diff --git a/vllm/model_executor/layers/quantization/compressed_tensors/utils.py b/vllm/model_executor/layers/quantization/compressed_tensors/utils.py index a74eaef5efdee..dfae4db71e546 100644 --- a/vllm/model_executor/layers/quantization/compressed_tensors/utils.py +++ b/vllm/model_executor/layers/quantization/compressed_tensors/utils.py @@ -30,7 +30,7 @@ def should_ignore_layer(layer_name: Optional[str], # in the safetensors checkpoint. So, we convert the name # from the fused version to unfused + check to make sure that # each shard of the fused layer has the same scheme. - if proj_name in FUSED_LAYER_NAME_MAPPING: + if proj_name in FUSED_LAYER_NAME_MAPPING and layer_name not in ignore: shard_proj_names = FUSED_LAYER_NAME_MAPPING[proj_name] # Convert fused_name --> [shard_names] diff --git a/vllm/model_executor/model_loader/loader.py b/vllm/model_executor/model_loader/loader.py index fdc4c6305bd5e..24e554e6060ab 100644 --- a/vllm/model_executor/model_loader/loader.py +++ b/vllm/model_executor/model_loader/loader.py @@ -45,9 +45,10 @@ filter_duplicate_safetensors_files, filter_files_not_needed_for_inference, get_gguf_extra_tensor_names, gguf_quant_weights_iterator, initialize_dummy_weights, np_cache_weights_iterator, pt_weights_iterator, - safetensors_weights_iterator) + runai_safetensors_weights_iterator, safetensors_weights_iterator) from vllm.model_executor.utils import set_weight_attrs from vllm.platforms import current_platform +from vllm.transformers_utils.utils import is_s3 from vllm.utils import is_pin_memory_available @@ -1234,6 +1235,118 @@ def load_model(self, vllm_config: VllmConfig) -> nn.Module: return model +class RunaiModelStreamerLoader(BaseModelLoader): + """ + Model loader that can load safetensors + files from local FS or S3 bucket. + """ + + def __init__(self, load_config: LoadConfig): + super().__init__(load_config) + if load_config.model_loader_extra_config: + extra_config = load_config.model_loader_extra_config + + if ("concurrency" in extra_config + and isinstance(extra_config.get("concurrency"), int)): + os.environ["RUNAI_STREAMER_CONCURRENCY"] = str( + extra_config.get("concurrency")) + + if ("memory_limit" in extra_config + and isinstance(extra_config.get("memory_limit"), int)): + os.environ["RUNAI_STREAMER_MEMORY_LIMIT"] = str( + extra_config.get("memory_limit")) + + runai_streamer_s3_endpoint = os.getenv( + 'RUNAI_STREAMER_S3_ENDPOINT') + aws_endpoint_url = os.getenv('AWS_ENDPOINT_URL') + if (runai_streamer_s3_endpoint is None + and aws_endpoint_url is not None): + os.environ["RUNAI_STREAMER_S3_ENDPOINT"] = aws_endpoint_url + + def _prepare_weights(self, model_name_or_path: str, + revision: Optional[str]) -> List[str]: + """Prepare weights for the model. + + If the model is not local, it will be downloaded.""" + is_s3_path = is_s3(model_name_or_path) + if is_s3_path: + try: + from vllm.transformers_utils.s3_utils import glob as s3_glob + except ImportError as err: + raise ImportError( + "Please install Run:ai optional dependency " + "to use the S3 capabilities. " + "You can install it with: pip install vllm[runai]" + ) from err + + is_local = os.path.isdir(model_name_or_path) + safetensors_pattern = "*.safetensors" + index_file = SAFE_WEIGHTS_INDEX_NAME + + hf_folder = (model_name_or_path if + (is_local or is_s3_path) else download_weights_from_hf( + model_name_or_path, + self.load_config.download_dir, + [safetensors_pattern], + revision, + ignore_patterns=self.load_config.ignore_patterns, + )) + + if is_s3_path: + hf_weights_files = s3_glob(path=hf_folder, + allow_pattern=[safetensors_pattern]) + else: + hf_weights_files = glob.glob( + os.path.join(hf_folder, safetensors_pattern)) + + if not is_local and not is_s3_path: + download_safetensors_index_file_from_hf( + model_name_or_path, index_file, self.load_config.download_dir, + revision) + + if not hf_weights_files: + raise RuntimeError( + f"Cannot find any safetensors model weights with " + f"`{model_name_or_path}`") + + return hf_weights_files + + def _get_weights_iterator( + self, model_or_path: str, + revision: str) -> Generator[Tuple[str, torch.Tensor], None, None]: + """Get an iterator for the model weights based on the load format.""" + hf_weights_files = self._prepare_weights(model_or_path, revision) + return runai_safetensors_weights_iterator(hf_weights_files) + + def download_model(self, model_config: ModelConfig) -> None: + """Download model if necessary""" + self._prepare_weights(model_config.model, model_config.revision) + + def load_model(self, vllm_config: VllmConfig) -> nn.Module: + """Perform streaming of the model to destination""" + device_config = vllm_config.device_config + model_config = vllm_config.model_config + + target_device = torch.device(device_config.device) + with set_default_torch_dtype(model_config.dtype): + with target_device: + model = _initialize_model(vllm_config=vllm_config) + + model_weights = model_config.model + if hasattr(model_config, "model_weights"): + model_weights = model_config.model_weights + model.load_weights( + self._get_weights_iterator(model_weights, + model_config.revision)) + + for _, module in model.named_modules(): + quant_method = getattr(module, "quant_method", None) + if quant_method is not None: + with device_loading_context(module, target_device): + quant_method.process_weights_after_loading(module) + return model.eval() + + def get_model_loader(load_config: LoadConfig) -> BaseModelLoader: """Get a model loader based on the load format.""" @@ -1255,4 +1368,7 @@ def get_model_loader(load_config: LoadConfig) -> BaseModelLoader: if load_config.load_format == LoadFormat.GGUF: return GGUFModelLoader(load_config) + if load_config.load_format == LoadFormat.RUNAI_STREAMER: + return RunaiModelStreamerLoader(load_config) + return DefaultModelLoader(load_config) diff --git a/vllm/model_executor/model_loader/weight_utils.py b/vllm/model_executor/model_loader/weight_utils.py index 9488d54edf365..f2a9e7e2687cb 100644 --- a/vllm/model_executor/model_loader/weight_utils.py +++ b/vllm/model_executor/model_loader/weight_utils.py @@ -410,6 +410,30 @@ def safetensors_weights_iterator( yield name, param +def runai_safetensors_weights_iterator( + hf_weights_files: List[str] +) -> Generator[Tuple[str, torch.Tensor], None, None]: + """Iterate over the weights in the model safetensor files.""" + try: + from runai_model_streamer import SafetensorsStreamer + except ImportError as err: + raise ImportError( + "Please install Run:ai optional dependency." + "You can install it with: pip install vllm[runai]") from err + + enable_tqdm = not torch.distributed.is_initialized( + ) or torch.distributed.get_rank() == 0 + with SafetensorsStreamer() as streamer: + for st_file in tqdm( + hf_weights_files, + desc="Loading safetensors using Runai Model Streamer", + disable=not enable_tqdm, + bar_format=_BAR_FORMAT, + ): + streamer.stream_file(st_file) + yield from streamer.get_tensors() + + def pt_weights_iterator( hf_weights_files: List[str] ) -> Generator[Tuple[str, torch.Tensor], None, None]: diff --git a/vllm/model_executor/models/pixtral.py b/vllm/model_executor/models/pixtral.py index 6676dd16e005f..f3d66c2313198 100644 --- a/vllm/model_executor/models/pixtral.py +++ b/vllm/model_executor/models/pixtral.py @@ -45,8 +45,12 @@ except ImportError: USE_XFORMERS_OPS = False -PIXTRAL_IMAGE_BREAK_ID = 12 -PIXTRAL_IMAGE_END_ID = 13 +# These token ids cannot be retrieved from model config +# so we hardcode them here. +PIXTRAL_12B_IMAGE_BREAK_ID = 12 +PIXTRAL_12B_IMAGE_END_ID = 13 +PIXTRAL_LARGE_IMAGE_BREAK_ID = 14 +PIXTRAL_LARGE_IMAGE_END_ID = 15 def get_max_pixtral_image_tokens(ctx: InputContext): @@ -118,8 +122,7 @@ def input_mapper_for_pixtral(ctx: InputContext, for image_data in data_list: image = ImageChunk(image=image_data) encoding = tokenizer.instruct.mm_encoder(image) - image = torch.from_numpy(encoding.image).to(device="cuda", - dtype=torch.float16) + image = torch.from_numpy(encoding.image).to(dtype=torch.float16) images.append(image) image_tokens_list.append(encoding.tokens) @@ -237,8 +240,9 @@ def get_multimodal_embeddings(self, **kwargs) -> Optional[NestedTensors]: # NOTE: Image embeddings are split into separate tensors for each image # by the indices of `[IMG_END]` token. - split_indices = torch.where( - image_tokens == PIXTRAL_IMAGE_END_ID)[0] + 1 + image_end_condition = (image_tokens == PIXTRAL_12B_IMAGE_END_ID) | ( + image_tokens == PIXTRAL_LARGE_IMAGE_END_ID) + split_indices = torch.where(image_end_condition)[0] + 1 if len(split_indices) <= 1: # Do not split, return as tensor of shape [1, fs, hs] return image_embeds.unsqueeze(0) @@ -260,8 +264,11 @@ def get_input_embeddings( if multimodal_embeddings is not None: inputs_embeds = merge_multimodal_embeddings( input_ids, inputs_embeds, multimodal_embeddings, [ - self.vision_args.image_token_id, PIXTRAL_IMAGE_END_ID, - PIXTRAL_IMAGE_BREAK_ID + self.vision_args.image_token_id, + PIXTRAL_12B_IMAGE_END_ID, + PIXTRAL_12B_IMAGE_BREAK_ID, + PIXTRAL_LARGE_IMAGE_BREAK_ID, + PIXTRAL_LARGE_IMAGE_END_ID, ]) return inputs_embeds diff --git a/vllm/transformers_utils/s3_utils.py b/vllm/transformers_utils/s3_utils.py new file mode 100644 index 0000000000000..6f63dab74d696 --- /dev/null +++ b/vllm/transformers_utils/s3_utils.py @@ -0,0 +1,146 @@ +import fnmatch +import os +import shutil +import signal +import tempfile +from pathlib import Path +from typing import Optional + +import boto3 + + +def _filter_allow(paths: list[str], patterns: list[str]) -> list[str]: + return [ + path for path in paths if any( + fnmatch.fnmatch(path, pattern) for pattern in patterns) + ] + + +def _filter_ignore(paths: list[str], patterns: list[str]) -> list[str]: + return [ + path for path in paths + if not any(fnmatch.fnmatch(path, pattern) for pattern in patterns) + ] + + +def glob(s3=None, + path: str = "", + allow_pattern: Optional[list[str]] = None) -> list[str]: + """ + List full file names from S3 path and filter by allow pattern. + + Args: + s3: S3 client to use. + path: The S3 path to list from. + allow_pattern: A list of patterns of which files to pull. + + Returns: + list[str]: List of full S3 paths allowed by the pattern + """ + if s3 is None: + s3 = boto3.client("s3") + bucket_name, _, paths = list_files(s3, + path=path, + allow_pattern=allow_pattern) + return [f"s3://{bucket_name}/{path}" for path in paths] + + +def list_files( + s3, + path: str, + allow_pattern: Optional[list[str]] = None, + ignore_pattern: Optional[list[str]] = None +) -> tuple[str, str, list[str]]: + """ + List files from S3 path and filter by pattern. + + Args: + s3: S3 client to use. + path: The S3 path to list from. + allow_pattern: A list of patterns of which files to pull. + ignore_pattern: A list of patterns of which files not to pull. + + Returns: + tuple[str, str, list[str]]: A tuple where: + - The first element is the bucket name + - The second element is string represent the bucket + and the prefix as a dir like string + - The third element is a list of files allowed or + disallowed by pattern + """ + parts = path.removeprefix('s3://').split('/') + prefix = '/'.join(parts[1:]) + bucket_name = parts[0] + + objects = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix) + paths = [obj['Key'] for obj in objects.get('Contents', [])] + + paths = _filter_ignore(paths, ["*/"]) + if allow_pattern is not None: + paths = _filter_allow(paths, allow_pattern) + + if ignore_pattern is not None: + paths = _filter_ignore(paths, ignore_pattern) + + return bucket_name, prefix, paths + + +class S3Model: + """ + A class representing a S3 model mirrored into a temporary directory. + + Attributes: + s3: S3 client. + dir: The temporary created directory. + + Methods: + pull_files(): Pull model from S3 to the temporary directory. + """ + + def __init__(self) -> None: + self.s3 = boto3.client('s3') + for sig in (signal.SIGINT, signal.SIGTERM): + existing_handler = signal.getsignal(sig) + signal.signal(sig, self._close_by_signal(existing_handler)) + self.dir = tempfile.mkdtemp() + + def __del__(self): + self._close() + + def _close(self) -> None: + if os.path.exists(self.dir): + shutil.rmtree(self.dir) + + def _close_by_signal(self, existing_handler=None): + + def new_handler(signum, frame): + self._close() + if existing_handler: + existing_handler(signum, frame) + + return new_handler + + def pull_files(self, + s3_model_path: str = "", + allow_pattern: Optional[list[str]] = None, + ignore_pattern: Optional[list[str]] = None) -> None: + """ + Pull files from S3 storage into the temporary directory. + + Args: + s3_model_path: The S3 path of the model. + allow_pattern: A list of patterns of which files to pull. + ignore_pattern: A list of patterns of which files not to pull. + + """ + bucket_name, base_dir, files = list_files(self.s3, s3_model_path, + allow_pattern, + ignore_pattern) + if len(files) == 0: + return + + for file in files: + destination_file = self.dir + file.removeprefix(base_dir) + local_dir = Path(destination_file).parent + os.makedirs(local_dir, exist_ok=True) + self.s3.download_file(bucket_name, file, destination_file) diff --git a/vllm/transformers_utils/utils.py b/vllm/transformers_utils/utils.py index 7a9041b04fbb9..10a09fb4f566c 100644 --- a/vllm/transformers_utils/utils.py +++ b/vllm/transformers_utils/utils.py @@ -3,6 +3,10 @@ from typing import Union +def is_s3(model_or_path: str) -> bool: + return model_or_path.lower().startswith('s3://') + + def check_gguf_file(model: Union[str, PathLike]) -> bool: """Check if the file is a GGUF model.""" model = Path(model) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 41fb4b25d45bb..cfdbea8004c35 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -98,7 +98,7 @@ def from_engine_args( start_engine_loop: bool = True, usage_context: UsageContext = UsageContext.ENGINE_CONTEXT, stat_loggers: Optional[Dict[str, StatLoggerBase]] = None, - ) -> "AsyncLLMEngine": + ) -> "AsyncLLM": """Create an AsyncLLM from the EngineArgs.""" # Create the engine configs. @@ -386,7 +386,3 @@ def errored(self) -> bool: @property def dead_error(self) -> BaseException: return Exception() # TODO: implement - - -# Retain V0 name for backwards compatibility. -AsyncLLMEngine = AsyncLLM diff --git a/vllm/v1/engine/mm_input_mapper.py b/vllm/v1/engine/mm_input_mapper.py index 218724bff6bba..8bfc739b3dbbc 100644 --- a/vllm/v1/engine/mm_input_mapper.py +++ b/vllm/v1/engine/mm_input_mapper.py @@ -180,6 +180,10 @@ def hash_prompt_mm_data(self, prompt: PromptType) -> Optional[List[str]]: return None mm_data = prompt["multi_modal_data"] + if not mm_data: + # mm_data can be None or an empty dict. + return None + image_inputs = mm_data["image"] return self.hash_images(image_inputs)