diff --git a/benchmarks/benchmark_throughput.py b/benchmarks/benchmark_throughput.py index 3a444489ad26d..e64241c1a6e3c 100644 --- a/benchmarks/benchmark_throughput.py +++ b/benchmarks/benchmark_throughput.py @@ -171,7 +171,7 @@ def run_vllm( ignore_eos=True, )) end = time.perf_counter() - + # print(met.metrics_report()) # print(met.short_metrics_report()) diff --git a/tests/entrypoints/openai/test_accuracy.py b/tests/entrypoints/openai/test_accuracy.py index 99e3eaca2f2de..d9ded25ee9163 100644 --- a/tests/entrypoints/openai/test_accuracy.py +++ b/tests/entrypoints/openai/test_accuracy.py @@ -67,8 +67,8 @@ def run_test(more_args): ), f"Expected: {EXPECTED_VALUE} | Measured: {measured_value}" -@pytest.mark.skipif(not current_platform.is_cuda() and - not current_platform.is_tpu(), +@pytest.mark.skipif(not current_platform.is_cuda() + and not current_platform.is_tpu(), reason="V1 currently only supported on CUDA") def test_lm_eval_accuracy_v1_engine(monkeypatch): """Run with the V1 Engine.""" diff --git a/vllm/attention/selector.py b/vllm/attention/selector.py index 32100f9fd5f16..9919c31ef8ab2 100644 --- a/vllm/attention/selector.py +++ b/vllm/attention/selector.py @@ -237,8 +237,8 @@ def which_attn_to_use(head_size: int, return _Backend.IPEX if current_platform.is_tpu(): - if (selected_backend != _Backend.PALLAS and - selected_backend != _Backend.PALLAS_VLLM_V1): + if (selected_backend != _Backend.PALLAS + and selected_backend != _Backend.PALLAS_VLLM_V1): logger.info("Cannot use %s backend on TPU.", selected_backend) if use_v1: return _Backend.PALLAS_VLLM_V1 diff --git a/vllm/config.py b/vllm/config.py index 0d5ed5dc51e48..6760bcbc24c05 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -1225,7 +1225,7 @@ def __init__(self, device: str = "auto") -> None: # Some device types require processing inputs on CPU if self.device_type in ["neuron", "openvino"]: self.device = torch.device("cpu") - # Device initialization should happen after initializing the + # Device initialization should happen after initializing the # distributed runtime. elif self.device_type in ["tpu"]: self.device = None diff --git a/vllm/v1/core/scheduler.py b/vllm/v1/core/scheduler.py index 215afcbe0c9f7..98eeb07ba5cfb 100644 --- a/vllm/v1/core/scheduler.py +++ b/vllm/v1/core/scheduler.py @@ -151,12 +151,12 @@ def schedule(self) -> "SchedulerOutput": num_computed_tokens -= 1 num_new_tokens = 1 computed_blocks.pop() - + # If chunked prefill is not enabled, breakout of the loop. - if (not self.scheduler_config.chunked_prefill_enabled and - num_new_tokens > token_budget): + if (not self.scheduler_config.chunked_prefill_enabled + and num_new_tokens > token_budget): break - + num_new_tokens = min(num_new_tokens, token_budget) assert num_new_tokens > 0 new_blocks = self.kv_cache_manager.allocate_slots( diff --git a/vllm/v1/executor/tpu_executor.py b/vllm/v1/executor/tpu_executor.py index 910fd8c2f5583..5e6e63086946d 100644 --- a/vllm/v1/executor/tpu_executor.py +++ b/vllm/v1/executor/tpu_executor.py @@ -10,6 +10,7 @@ # import torch_xla.debug.profiler as xp + class TPUExecutor: def __init__(self, vllm_config: VllmConfig) -> None: @@ -32,11 +33,10 @@ def __init__(self, vllm_config: VllmConfig) -> None: # self.server = xp.start_server(9012) def _create_worker( - self, - local_rank: int = 0, - rank: int = 0, - distributed_init_method: Optional[str] = None - ) -> TPUWorker: + self, + local_rank: int = 0, + rank: int = 0, + distributed_init_method: Optional[str] = None) -> TPUWorker: """Return worker init args for a given rank.""" if distributed_init_method is None: diff --git a/vllm/v1/worker/tpu_model_runner.py b/vllm/v1/worker/tpu_model_runner.py index ef5dfcb0954c1..a57387c2797bd 100644 --- a/vllm/v1/worker/tpu_model_runner.py +++ b/vllm/v1/worker/tpu_model_runner.py @@ -41,18 +41,17 @@ class PrefillData: attn_metadata: List def zipped(self): - return zip(self.request_ids, - self.prompt_lens, - self.token_ids, - self.position_ids, - self.attn_metadata) + return zip(self.request_ids, self.prompt_lens, self.token_ids, + self.position_ids, self.attn_metadata) + + @dataclass class DecodeData: num_decodes: int token_ids: torch.Tensor position_ids: torch.Tensor attn_metadata: PallasAttentionMetadata - + class TPUModelRunner: @@ -115,8 +114,7 @@ def __init__( self.prefill_positions = torch.tensor( range(self.max_model_len), device="cpu", - ).to(torch.int32).reshape(1,-1) - + ).to(torch.int32).reshape(1, -1) def _update_states(self, scheduler_output: "SchedulerOutput") -> None: # Remove stopped requests from the cached states. @@ -202,7 +200,6 @@ def _update_states(self, scheduler_output: "SchedulerOutput") -> None: req_state = self.requests[req_id] self.input_batch.add_request(req_state, None) - def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): total_num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens assert total_num_scheduled_tokens > 0 @@ -210,7 +207,7 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): num_reqs = self.input_batch.num_reqs num_decodes = self.input_batch.num_decodes num_prefills = self.input_batch.num_prefills - + assert num_decodes + num_prefills > 0 # Get the number of scheduled tokens for each request. @@ -219,7 +216,7 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): for idx, req_id in enumerate(self.input_batch.req_ids[:num_reqs]): num_tokens = scheduler_output.num_scheduled_tokens[req_id] num_scheduled_tokens.append(num_tokens) - + # Assert Decodes Are Decodes. if idx < num_decodes: assert num_tokens == 1 @@ -227,7 +224,7 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): ######################### PREFILLS ######################### # Prefills run separately, each with shape [1, padded_prompt_len], # due to lack of variable length flashattention. - # + # # Due to static shapes, prefills are padded to the nearest power # of two, such that we can avoid recompilation. @@ -251,28 +248,27 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): # TOKEN_IDS. prefill_token_ids.append( torch.from_numpy( - self.input_batch.token_ids_cpu[idx:idx+1, :padded_prompt_len] - ).to(self.device) - ) + self.input_batch.token_ids_cpu[idx:idx + + 1, :padded_prompt_len]).to( + self.device)) # POSITIONS. positions = self.prefill_positions[:, :padded_prompt_len] - prefill_position_ids.append( - positions.to(self.device) - ) + prefill_position_ids.append(positions.to(self.device)) # SLOT_MAPPING. # The "slot" is the "physical index" of a token in the KV cache. # We look up the block_idx in the block table (logical <> physical map) # to compute this. - block_numbers = self.input_batch.block_table_cpu_tensor[idx, positions // self.block_size].reshape(1,-1) + block_numbers = self.input_batch.block_table_cpu_tensor[ + idx, positions // self.block_size].reshape(1, -1) block_offsets = positions % self.block_size slot_mapping = block_numbers * self.block_size + block_offsets # Set an out of range value for the padding tokens so that they # are ignored when inserting into the KV cache. slot_mapping[:, prompt_len:] = _PAD_SLOT_ID slot_mapping = slot_mapping.long() - + # ATTN_METADATA. prefill_attn_metadata.append( PallasAttentionMetadata( @@ -280,8 +276,7 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): slot_mapping=slot_mapping.to(self.device), block_tables=None, context_lens=None, - ) - ) + )) prefill_data = PrefillData( request_ids=prefill_request_ids, @@ -293,23 +288,22 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): if num_decodes == 0: return prefill_data, None - + ######################### DECODES ######################### # Decodes run as one single padded batch with shape [batch, 1] # - # We need to set _PAD_SLOT_ID for the padding tokens in the + # We need to set _PAD_SLOT_ID for the padding tokens in the # slot_mapping, such that the attention KV cache insertion - # logic knows to ignore those indicies. Otherwise, the + # logic knows to ignore those indicies. Otherwise, the # padding data can be dummy since we have a causal mask. # PAD FOR STATIC SHAPES. padded_batch_size = _get_padded_batch_size(num_decodes) - + # POSITIONS. [batch, 1] # We slice at the end, since we use the positions for gathering. positions = torch.from_numpy( - self.input_batch.num_computed_tokens_cpu.reshape(-1,1) - ) + self.input_batch.num_computed_tokens_cpu.reshape(-1, 1)) index = positions.to(torch.int64) positions = positions[:padded_batch_size] @@ -327,8 +321,7 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): block_number = torch.gather( input=self.input_batch.block_table_cpu_tensor, dim=1, - index=(index // self.block_size) - ) + index=(index // self.block_size)) block_offsets = index % self.block_size slot_mapping = block_number * self.block_size + block_offsets # Set an out of range value for the padding tokens so that they @@ -337,23 +330,22 @@ def _prepare_inputs(self, scheduler_output: "SchedulerOutput"): slot_mapping = slot_mapping[:padded_batch_size] # BLOCK_TABLE [batch, max_num_blocks_per_req] - block_table = self.input_batch.block_table_cpu_tensor[:padded_batch_size] - + block_table = self.input_batch.block_table_cpu_tensor[: + padded_batch_size] + # CONTEXT_LENS [batch_size] - context_lens = (positions.reshape(-1) + 1) - + context_lens = (positions.reshape(-1) + 1) + # CPU<>TPU sync happens here. - decode_data = DecodeData( - num_decodes=num_decodes, - token_ids=token_ids.to(self.device), - position_ids=positions.to(self.device), - attn_metadata=PallasAttentionMetadata( - is_prompt=False, - slot_mapping=slot_mapping.to(self.device), - block_tables=block_table.to(self.device), - context_lens=context_lens.to(self.device), - ) - ) + decode_data = DecodeData(num_decodes=num_decodes, + token_ids=token_ids.to(self.device), + position_ids=positions.to(self.device), + attn_metadata=PallasAttentionMetadata( + is_prompt=False, + slot_mapping=slot_mapping.to(self.device), + block_tables=block_table.to(self.device), + context_lens=context_lens.to(self.device), + )) return prefill_data, decode_data @@ -372,7 +364,6 @@ def _prepare_sampling( sampling_metadata = self.input_batch.make_sampling_metadata(skip_copy) return sampling_metadata - def execute_model( self, scheduler_output: "SchedulerOutput", @@ -387,23 +378,22 @@ def execute_model( if decode_data: num_decodes = decode_data.num_decodes - selected_token_ids = self.model( - decode_data.token_ids, - decode_data.position_ids, - decode_data.attn_metadata, - self.kv_caches, - is_prompt=False - ) - + selected_token_ids = self.model(decode_data.token_ids, + decode_data.position_ids, + decode_data.attn_metadata, + self.kv_caches, + is_prompt=False) + # NOTE: TPU<>CPU sync happens here. # It is important to call .cpu() first to avoid compilation on hotpath. token_ids = selected_token_ids.cpu()[:num_decodes] sampled_token_ids_list = token_ids.tolist() sampled_token_ids[:num_decodes] = token_ids - for i, req_id in enumerate(self.input_batch.req_ids[:decode_data.num_decodes]): + for i, req_id in enumerate( + self.input_batch.req_ids[:decode_data.num_decodes]): req_state = self.requests[req_id] - + # NO CHUNKED PREFILL assert scheduler_output.num_scheduled_tokens[req_id] == 1 seq_len = (req_state.num_computed_tokens + @@ -415,18 +405,15 @@ def execute_model( req_state.output_token_ids.append(token_id) ########## PREFILLS ########## - for idx, (req_id, prompt_len, - token_ids, position_ids, + for idx, (req_id, prompt_len, token_ids, position_ids, attn_metadata) in enumerate(prefill_data.zipped()): # [padded_prompt_len] - selected_token_ids = self.model( - token_ids, - position_ids, - attn_metadata, - self.kv_caches, - is_prompt=True - ) + selected_token_ids = self.model(token_ids, + position_ids, + attn_metadata, + self.kv_caches, + is_prompt=True) # NOTE: TPU<>CPU sync happens here. # It is important to call .cpu() first to avoid compilation on hotpath. token_id = selected_token_ids.cpu()[prompt_len - 1].item() @@ -437,7 +424,7 @@ def execute_model( if req_state.num_computed_tokens > 0: breakpoint() assert req_state.num_computed_tokens == 0 - seq_len = (req_state.num_computed_tokens + + seq_len = (req_state.num_computed_tokens + scheduler_output.num_scheduled_tokens[req_id]) # TODO: chunked prefill. @@ -481,30 +468,19 @@ def load_model(self) -> None: xm.wait_device_ops() self.model = ModelWrapper(model) - def _dummy_run( - self, - batch_size: int, - seq_len: int, - kv_caches: List[torch.Tensor], - is_prompt: bool - ) -> None: + def _dummy_run(self, batch_size: int, seq_len: int, + kv_caches: List[torch.Tensor], is_prompt: bool) -> None: """Dummy warmup run for memory usage and graph compilation.""" - input_ids = torch.zeros( - (batch_size, seq_len), - dtype=torch.int32, - device=self.device - ) - position_ids = torch.zeros( - (batch_size, seq_len), - dtype=torch.int32, - device=self.device - ) - slot_mapping = torch.zeros( - (batch_size, seq_len), - dtype=torch.int64, - device=self.device - ) + input_ids = torch.zeros((batch_size, seq_len), + dtype=torch.int32, + device=self.device) + position_ids = torch.zeros((batch_size, seq_len), + dtype=torch.int32, + device=self.device) + slot_mapping = torch.zeros((batch_size, seq_len), + dtype=torch.int64, + device=self.device) block_tables = None if is_prompt else torch.zeros( (batch_size, self.max_num_blocks_per_req), dtype=torch.int32, @@ -521,7 +497,7 @@ def _dummy_run( block_tables=block_tables, context_lens=context_lens, ) - + # NOTE: There are two stages of compilation: torch.compile and # XLA compilation. Using `mark_dynamic` can reduce the torch.compile # overhead by reusing the FX graph for different shapes. @@ -560,31 +536,31 @@ def profile_run(self) -> None: dummy_kv_caches = [( torch.tensor([], dtype=torch.float32, device=self.device), torch.tensor([], dtype=torch.float32, device=self.device), - ) for _ in range(self.num_attn_layers) - ] + ) for _ in range(self.num_attn_layers)] # Round to multiple of 16. seq_len = (self.max_num_tokens + 15) // 16 * 16 # Run empty forward. - self._dummy_run( - batch_size=1, - seq_len=seq_len, - kv_caches=dummy_kv_caches, - is_prompt=True) - + self._dummy_run(batch_size=1, + seq_len=seq_len, + kv_caches=dummy_kv_caches, + is_prompt=True) def capture_model(self) -> None: """Compile the model.""" - + logger.info("Compiling the model with different input shapes.") - + # Prefill shapes. start = time.perf_counter() for batch_size in [1]: seq_len = 16 while True: - self._dummy_run(batch_size, seq_len, self.kv_caches, is_prompt=True) + self._dummy_run(batch_size, + seq_len, + self.kv_caches, + is_prompt=True) xm.wait_device_ops() logger.info("batch_size: %d, seq_len: %d", batch_size, seq_len) if seq_len >= self.model_config.max_model_len: @@ -602,7 +578,10 @@ def capture_model(self) -> None: seq_len = 1 batch_size = 8 # Must be in sync with _get_padded_batch_size() while True: - self._dummy_run(batch_size, seq_len, self.kv_caches, is_prompt=False) + self._dummy_run(batch_size, + seq_len, + self.kv_caches, + is_prompt=False) xm.wait_device_ops() logger.info("batch_size: %d, seq_len: %d", batch_size, seq_len) @@ -613,7 +592,6 @@ def capture_model(self) -> None: end = time.time() logger.info("Compilation for decode done in %.2f s.", end - start) - def initialize_kv_cache(self, num_blocks: int) -> None: assert len(self.kv_caches) == 0 kv_cache_shape = PallasAttentionBackend.get_kv_cache_shape( @@ -621,8 +599,8 @@ def initialize_kv_cache(self, num_blocks: int) -> None: for _ in range(self.num_attn_layers): self.kv_caches.append(( torch.zeros(kv_cache_shape, - dtype=self.kv_cache_dtype, - device=self.device), + dtype=self.kv_cache_dtype, + device=self.device), torch.zeros(kv_cache_shape, dtype=self.kv_cache_dtype, device=self.device), @@ -908,6 +886,7 @@ def no_logprob(self) -> bool: def no_prompt_logprob(self) -> bool: return len(self.prompt_logprob_reqs) == 0 + class ModelWrapper(TorchCompileWrapperWithCustomDispatcher): def __init__(self, model: nn.Module): @@ -996,6 +975,7 @@ def _get_padded_batch_size(batch_size: int) -> int: else: return ((batch_size + 15) // 16) * 16 + def _get_padded_prefill_len(x: int) -> int: # NOTE(woosuk): The pallas FlashAttention kernel requires the sequence # length to be a multiple of 16. We pad the prompt length to the nearest diff --git a/vllm/v1/worker/tpu_worker.py b/vllm/v1/worker/tpu_worker.py index b016922460251..ecdc88745fa59 100644 --- a/vllm/v1/worker/tpu_worker.py +++ b/vllm/v1/worker/tpu_worker.py @@ -22,14 +22,11 @@ logger = init_logger(__name__) + class TPUWorker: - def __init__( - self, - vllm_config: VllmConfig, - local_rank: int, - rank: int, - distributed_init_method: str - ): + + def __init__(self, vllm_config: VllmConfig, local_rank: int, rank: int, + distributed_init_method: str): self.vllm_config = vllm_config self.model_config = vllm_config.model_config self.cache_config = vllm_config.cache_config @@ -41,7 +38,7 @@ def __init__( self.speculative_config = vllm_config.speculative_config self.prompt_adapter_config = vllm_config.prompt_adapter_config self.observability_config = vllm_config.observability_config - + self.local_rank = local_rank self.rank = rank self.distributed_init_method = distributed_init_method @@ -91,11 +88,10 @@ def initialize(self): per_rank_path = os.path.join(envs.VLLM_XLA_CACHE_PATH, f"tp{world_size}_rank{rank}") xr.initialize_cache(per_rank_path, readonly=False) - + def load_model(self): self.model_runner.load_model() - def determine_num_available_blocks(self) -> Tuple[int, int]: """Profiles the peak memory usage of the model to determine how many KV blocks may be allocated without OOMs. @@ -108,33 +104,30 @@ def determine_num_available_blocks(self) -> Tuple[int, int]: You may limit the usage of GPU memory by adjusting the `gpu_memory_utilization` parameter. """ - - return 3144, 0 - - # self.model_runner.profile_run() - - # # Synchronize before measuring the memory usage. - # xm.wait_device_ops() - - # # Get the maximum amount of memory used by the model weights and - # # intermediate activations. - # m = xm.get_memory_info(self.device) - # total_tpu_memory = m["bytes_limit"] - # peak_memory = m["peak_bytes_used"] # Weights + intermediate activations. - # logger.debug("Peak Used: %sGB", - # peak_memory // 1024 // 1024 // 1024) - # logger.debug("Total Memory: %sGB", - # total_tpu_memory // 1024 // 1024 // 1024) - - # cache_block_size = _get_cache_block_size(self.cache_config, - # self.model_config, - # self.parallel_config) - # num_tpu_blocks = int( - # (total_tpu_memory * self.cache_config.gpu_memory_utilization - - # peak_memory) // cache_block_size) - # num_tpu_blocks = (max(num_tpu_blocks, 0) // 8) * 8 - return num_tpu_blocks, 0 + self.model_runner.profile_run() + + # Synchronize before measuring the memory usage. + xm.wait_device_ops() + + # Get the maximum amount of memory used by the model weights and + # intermediate activations. + m = xm.get_memory_info(self.device) + total_tpu_memory = m["bytes_limit"] + peak_memory = m["peak_bytes_used"] # Weights + intermediate activations. + logger.debug("Peak Used: %sGB", + peak_memory // 1024 // 1024 // 1024) + logger.debug("Total Memory: %sGB", + total_tpu_memory // 1024 // 1024 // 1024) + + cache_block_size = _get_cache_block_size(self.cache_config, + self.model_config, + self.parallel_config) + num_tpu_blocks = int( + (total_tpu_memory * self.cache_config.gpu_memory_utilization - + peak_memory) // cache_block_size) + num_tpu_blocks = (max(num_tpu_blocks, 0) // 8) * 8 + return num_tpu_blocks, 0 def initialize_cache(self, num_tpu_blocks: int) -> None: """Allocate TPU and CPU KV cache with the specified number of blocks.""" @@ -143,7 +136,7 @@ def initialize_cache(self, num_tpu_blocks: int) -> None: raise ValueError("No available memory for the cache blocks. " "Try increasing `gpu_memory_utilization` when " "initializing the engine.") - + max_seq_len = self.cache_config.block_size * num_tpu_blocks max_model_len = self.model_config.max_model_len if max_model_len > max_seq_len: @@ -161,11 +154,11 @@ def initialize_cache(self, num_tpu_blocks: int) -> None: xm.mark_step() xm.wait_device_ops() m = xm.get_memory_info(self.device) - peak_memory = m["peak_bytes_used"] # Weights + intermediate activations. - logger.debug("Peak GB Used Post KV Cache: %sGB", + peak_memory = m[ + "peak_bytes_used"] # Weights + intermediate activations. + logger.debug("Peak GB Used Post KV Cache: %sGB", peak_memory // 1024 // 1024 // 1024) - def compile_or_warm_up_model(self) -> None: if not self.model_config.enforce_eager: self.model_runner.capture_model() @@ -174,7 +167,6 @@ def compile_or_warm_up_model(self) -> None: # the model initialization and profiling. set_random_seed(self.model_config.seed) - def execute_model( self, scheduler_output: "SchedulerOutput", @@ -203,4 +195,4 @@ def _get_cache_block_size( else: dtype = STR_DTYPE_TO_TORCH_DTYPE[cache_config.cache_dtype] dtype_size = get_dtype_size(dtype) - return dtype_size * total \ No newline at end of file + return dtype_size * total diff --git a/vllm/worker/tpu_model_runner.py b/vllm/worker/tpu_model_runner.py index 4e0f52a6bac39..a721186137328 100644 --- a/vllm/worker/tpu_model_runner.py +++ b/vllm/worker/tpu_model_runner.py @@ -417,7 +417,6 @@ def _prepare_decode( block_tables=block_tables, context_lens=context_lens, ) - return input_tokens, input_positions, attn_metadata, input_lens def _prepare_sample(