diff --git a/benchmarks/benchmark/tools/profile-generator/container/benchmark_serving.py b/benchmarks/benchmark/tools/profile-generator/container/benchmark_serving.py index d46a6f4f9..3bad2c9f9 100644 --- a/benchmarks/benchmark/tools/profile-generator/container/benchmark_serving.py +++ b/benchmarks/benchmark/tools/profile-generator/container/benchmark_serving.py @@ -5,6 +5,7 @@ It currently supports TGI, vLLM, Triton TensorRT-LLM and Saxml. """ +from abc import ABC, abstractmethod import argparse import asyncio from datetime import datetime @@ -12,7 +13,8 @@ import random import requests import time -from typing import AsyncGenerator, List, Optional, Tuple, Dict +import os +from typing import AsyncGenerator, List, Optional, Tuple, Dict, TypedDict from prometheus_client import start_http_server, Histogram, Gauge import google.auth @@ -21,20 +23,22 @@ import aiohttp import numpy as np +from sympy import symbols +from sympy.parsing.sympy_parser import parse_expr from transformers import AutoTokenizer from transformers import PreTrainedTokenizerBase -from google.protobuf.timestamp_pb2 import Timestamp - MIN_SEQ_LEN = 4 CLIENT_TIMEOUT_SEC = 3 * 60 * 60 NEW_TEXT_KEY = "\nOutput:\n" PROMETHEUS_PORT = 9090 +NS_IN_SEC = 1_000_000_000 # Prometheus Metrics prompt_length_metric = Histogram("LatencyProfileGenerator:prompt_length", "Input prompt length", buckets=[2**i for i in range(1, 16)]) response_length_metric = Histogram("LatencyProfileGenerator:response_length", "Response length", buckets=[2**i for i in range(1, 16)]) tpot_metric = Histogram('LatencyProfileGenerator:time_per_output_token', 'Time per output token per request') +ttft_metric = Histogram('LatencyProfileGenerator:time_to_first_token', 'Time to first token per request') active_requests_metric = Gauge('LatencyProfileGenerator:active_requests', 'How many requests actively being processed') # Add trace config for monitoring in flight requests @@ -52,113 +56,180 @@ async def on_request_end(session, trace_config_ctx, params): gcs_client = None gcs_bucket = None -def sample_requests( - dataset_path: str, - num_requests: int, - max_input_len: int, - max_output_len: int, - tokenizer: PreTrainedTokenizerBase, - use_dummy_text: bool, -) -> List[Tuple[str, int, int]]: - """Samples requests from the dataset or creates dummy requests.""" - if use_dummy_text: - dummy_prompt_token_ids = [0] * max_input_len - dummy_prompt = tokenizer.decode(dummy_prompt_token_ids) - dummy_requests = [( - dummy_prompt, - max_input_len, - max_output_len, - )] * num_requests - return dummy_requests - - # Load the dataset. - with open(dataset_path) as f: - dataset = json.load(f) - # Filter out the conversations with less than 2 turns. - dataset = [data for data in dataset if len(data["conversations"]) >= 2] - # Only keep the first two turns of each conversation. - dataset = [ - (data["conversations"][0]["value"], data["conversations"][1]["value"]) - for data in dataset - ] - - # Tokenize the prompts and completions. - prompts = [prompt for prompt, _ in dataset] - prompt_token_ids = tokenizer(prompts).input_ids - completions = [completion for _, completion in dataset] - completion_token_ids = tokenizer(completions).input_ids - tokenized_dataset = [] - for i in range(len(dataset)): - output_len = len(completion_token_ids[i]) - tokenized_dataset.append((prompts[i], prompt_token_ids[i], output_len)) - - # Filter out too long sequences. - filtered_dataset: List[Tuple[str, int, int]] = [] - for prompt, prompt_token_ids, output_len in tokenized_dataset: - prompt_len = len(prompt_token_ids) - if prompt_len < MIN_SEQ_LEN or output_len < MIN_SEQ_LEN: - # Prune too short sequences. - # This is because TGI causes errors when the input or output length - # is too short. - continue - if prompt_len > max_input_len or output_len > max_output_len: - # Prune too long sequences. - continue - filtered_dataset.append((prompt, prompt_len, output_len)) +class ErrorsReport(): + ClientConnectorErrors: int + TimeoutErrors: int + ContentTypeErrors: int + ClientOSErrors: int + ServerDisconnectedErrors: int + unknown_errors: int - # Sample the requests. - sampled_requests = random.sample(filtered_dataset, num_requests) - return sampled_requests + def __init__(self): + self.ClientConnectorErrors = 0 + self.TimeoutErrors = 0 + self.ContentTypeErrors = 0 + self.ClientOSErrors = 0 + self.ServerDisconnectedErrors = 0 + self.unknown_errors = 0 + + def to_dict(self) -> dict: + return {k: v for k, v in self.__dict__.items() if isinstance(v, int)} -async def get_request( - input_requests: List[Tuple[str, int, int]], - request_rate: float, -) -> AsyncGenerator[Tuple[str, int, int], None]: - """Gets request async.""" - input_requests = iter(input_requests) - for request in input_requests: - yield request + def record_error(self, error: Exception): + if isinstance(error, aiohttp.client_exceptions.ClientConnectorError): + self.ClientConnectorErrors += 1 + print(f"ClientConnectorError: {error}") + elif isinstance(error, asyncio.TimeoutError): + self.TimeoutErrors += 1 + print(f"TimeoutError: {error}") + elif isinstance(error, aiohttp.client_exceptions.ContentTypeError): + self.ContentTypeErrors += 1 + print(f"ContentTypeError: {error}") + elif isinstance(error, aiohttp.client_exceptions.ClientOSError): + self.ClientOSErrors += 1 + print(f"ClientOSError: {error}") + elif isinstance(error, aiohttp.client_exceptions.ServerDisconnectedError): + self.ServerDisconnectedErrors += 1 + print(f"ServerDisconnectedError: {error}") + else: + self.unknown_errors += 1 + print(f"Unknown error: {error}") - if request_rate == float("inf"): - # If the request rate is infinity, then we don't need to wait. - continue - # Sample the request interval from the exponential distribution. - interval = np.random.exponential(1.0 / request_rate) - # The next request will be sent after the interval. - await asyncio.sleep(interval) + def append_report(self, report: "ErrorsReport"): + self.ClientConnectorErrors += report.ClientConnectorErrors + self.TimeoutErrors += report.TimeoutErrors + self.ContentTypeErrors += report.ContentTypeErrors + self.ClientOSErrors += report.ClientOSErrors + self.ServerDisconnectedErrors += report.ServerDisconnectedErrors + self.unknown_errors += report.unknown_errors + +class Backend(ABC): + """ + An abstract base class for Backend that defines the interface + for new model server backends. + """ -def init_errors_map() -> Dict[str, int]: - errors = { - "ClientConnectorError": 0, - "TimeoutError": 0, - "ContentTypeError": 0, - "ClientOSError": 0, - "ServerDisconnectedError": 0, - "unknown_error": 0, - } - return errors - -async def send_stream_request( - backend: str, - api_url: str, - prompt: str, - prompt_len: int, - output_len: int, - best_of: int, - use_beam_search: bool, - top_k: int, - tokenizer: PreTrainedTokenizerBase, - sax_model: str, - model: str, -) -> Tuple[Tuple[int, int, float], float, Dict[str, int]]: - """Sends stream request to server""" - request_start_time = time.time() - errors = init_errors_map() + async def send_request( + self, + api_url: str, + prompt: str, + prompt_len: int, + output_len: int, + best_of: int, + use_beam_search: bool, + top_k: int, + tokenizer: PreTrainedTokenizerBase, + sax_model: str, + model: str, + streaming: bool, + ) -> Tuple[Optional[Tuple[int, int, float]], Optional[float], Optional[ErrorsReport]]: + """Sends request to server.""" + request_start_time = time.time() + errors = ErrorsReport() + headers = {"User-Agent": "Benchmark Client"} + pload = self.create_request_payload( + prompt=prompt, + prompt_len=prompt_len, + output_len=output_len, + best_of=best_of, + use_beam_search=use_beam_search, + top_k=top_k, + tokenizer=tokenizer, + sax_model=sax_model, + model=model, + streaming=streaming + ) + + # Set client timeout to be 3 hrs. + timeout = aiohttp.ClientTimeout(total=CLIENT_TIMEOUT_SEC) + start_time = time.perf_counter() + output = "" + ttft = 0.0 + async with aiohttp.ClientSession(timeout=timeout,trust_env=True) as session: + while True: + try: + async with session.post(f"{api_url}/{self.get_endpoint()}", headers=headers, json=pload, ssl=False) as response: + output, ttft = await self.results_from_response(response, streaming, start_time) + # Re-send the request if it failed. + if "error" not in output: + break + except Exception as e: + errors.record_error(e) + return None, None, errors + request_end_time = time.time() + # Naive HF transformers generation and TensorRT-LLM generation stops at EOS + # tokens and the generation may be shorter than the ground-truth output + # sequence length. + output_len = self.get_response_length( + response=output, + request_len=prompt_len, + tokenizer=tokenizer + ) - headers = {"User-Agent": "Benchmark Client"} - if backend == "vllm": - pload = { + # (prompt len, output len, latency, success) + request_latency = (prompt_len, output_len, (request_end_time - request_start_time)) + tpot_metric.observe((request_end_time - request_start_time) / output_len) + prompt_length_metric.observe(prompt_len) + response_length_metric.observe(output_len) + if ttft is not None: + ttft_metric.observe(ttft) + + return request_latency, ttft, None + + @abstractmethod + def create_request_payload(self, + prompt: str, + prompt_len: int, + output_len: int, + best_of: int, + use_beam_search: bool, + top_k: int, + tokenizer: PreTrainedTokenizerBase, + sax_model: str, + model: str, + streaming: bool) -> Dict: + pass + + @abstractmethod + def get_response_length( + self, + request_len: int, + response: Dict, + tokenizer: PreTrainedTokenizerBase) -> int: + pass + + @abstractmethod + def get_server_metrics(self) -> List[str]: + pass + + @abstractmethod + def get_endpoint(self) -> str: + pass + + async def results_from_response(self, response: aiohttp.ClientResponse, streaming: bool, start_time: float) -> Tuple[Dict, Optional[float]]: + if streaming: + raise Exception("This backend does not support parsing streaming responses") + else: + return await response.json() + +class vLLMBackend(Backend): + def get_server_metrics(self) -> List[str]: + return ["vllm:gpu_cache_usage_perc", "vllm:num_requests_waiting"] + def get_endpoint(self) -> str: + return "v1/completions" + def create_request_payload(self, + prompt: str, + prompt_len: int, + output_len: int, + best_of: int, + use_beam_search: bool, + top_k: int, + tokenizer: PreTrainedTokenizerBase, + sax_model: str, + model: str, + streaming: bool): + return { "model": model, "prompt": prompt, "n": 1, @@ -167,115 +238,159 @@ async def send_stream_request( "temperature": 0.0 if use_beam_search else 1.0, "top_p": 1.0, "max_tokens": output_len, - "ignore_eos": True, - "stream": True, + "ignore_eos": False, + "stream": streaming, } - else: - raise ValueError(f"Unknown backend: {backend}") - - ttft = 0.0 - st = time.perf_counter() - output = "" - timeout = aiohttp.ClientTimeout(total=CLIENT_TIMEOUT_SEC) - async with aiohttp.ClientSession(timeout=timeout,trust_env=True) as session: - try: - async with session.post(api_url, headers=headers, json=pload, ssl=False) as response: - async for chunk_bytes in response.content.iter_chunks(): - chunk_bytes = chunk_bytes[0].strip() - if not chunk_bytes: - continue - timestamp = time.perf_counter() - # First token - if ttft == 0.0: - ttft = timestamp - st + def get_response_length( + self, + request_len: int, + response: Dict, + tokenizer: PreTrainedTokenizerBase): + output_token_ids = tokenizer(response["choices"][0]["text"]).input_ids + return len(output_token_ids) + async def results_from_response(self, response: aiohttp.ClientResponse, streaming: bool, start_time: float) -> Tuple[Dict, Optional[float]]: + ttft = 0.0 + + # Make a streaming response look like a non streaming response for detokenizing later + output = { + 'choices': [{ + 'text' : "" + }] + } + if streaming: + async for chunk_bytes in response.content.iter_chunks(): + chunk_bytes = chunk_bytes[0].strip() + if not chunk_bytes: + continue - if chunk_bytes.decode("utf-8")[6:] != "[DONE]": - if backend == "vllm": - output += json.loads(chunk_bytes.decode("utf-8")[6:])["choices"][0]["text"] - except aiohttp.client_exceptions.ClientConnectorError as client_err: - errors["ClientConnectorError"] += 1 - print(f"ClientConnectorError: {client_err}") - return None, None, errors - except asyncio.TimeoutError as timeout_err: - errors["TimeoutError"] += 1 - print(f"TimeoutError: {timeout_err}") - return None, None, errors - except aiohttp.client_exceptions.ClientOSError as e: - errors["ClientOSError"] += 1 - print(f"ClientOSError: {e}") - return None, None, errors - except aiohttp.client_exceptions.ContentTypeError as e: - print(f"ContentTypeError: {e}, response: {response}") - errors["ContentTypeError"] += 1 - return None, None, errors - except aiohttp.client_exceptions.ServerDisconnectedError as e: - errors["ServerDisconnectedError"] += 1 - print(f"ServerDisconnectedError: {e}") - return None, None, errors - except Exception as e: - print(f"Unknown error {e}") - errors["unknown_error"] += 1 - return None, None, errors - request_end_time = time.time() - output_token_ids = tokenizer(output).input_ids - output_len = len(output_token_ids) - request_latency = (prompt_len, output_len, (request_end_time - request_start_time)) - return request_latency, ttft, None - -async def send_request( - backend: str, - api_url: str, - prompt: str, - prompt_len: int, - output_len: int, - best_of: int, - use_beam_search: bool, - top_k: int, - tokenizer: PreTrainedTokenizerBase, - sax_model: str, - model: str, -) -> Tuple[Tuple[int, int, float], float, Dict[str, int]]: - """Sends request to server.""" - request_start_time = time.time() - errors = init_errors_map() + timestamp = time.perf_counter() + + # Calculate Time-to-First-Token (TTFT) + if ttft == 0.0: + ttft = timestamp - start_time - headers = {"User-Agent": "Benchmark Client"} - if backend == "vllm": - pload = { - "model": model, + # Process the chunk if it's not the "[DONE]" message + if chunk_bytes.decode("utf-8")[6:] != "[DONE]": + output["choices"][0]["text"] += json.loads(chunk_bytes.decode("utf-8")[6:])["choices"][0]["text"] + return output, ttft + else: + res = await response.json() + return res, None + +class JetstreamBackend(Backend): + def get_server_metrics(self) -> List[str]: + return [ + "jetstream_slots_used_percentage", + "jetstream_prefill_backlog_size", + ] + def get_endpoint(self) -> str: + return "" + def create_request_payload(self, + prompt: str, + prompt_len: int, + output_len: int, + best_of: int, + use_beam_search: bool, + top_k: int, + tokenizer: PreTrainedTokenizerBase, + sax_model: str, + model: str, + streaming: bool): + return { "prompt": prompt, - "n": 1, - "best_of": best_of, - "use_beam_search": use_beam_search, - "temperature": 0.0 if use_beam_search else 1.0, - "top_p": 1.0, "max_tokens": output_len, - "ignore_eos": False, - "stream": False, } - elif backend == "tgi": - assert not use_beam_search - params = { + def get_response_length( + self, + request_len: int, + response: Dict, + tokenizer: PreTrainedTokenizerBase): + output_token_ids = tokenizer(response["response"]).input_ids + return len(output_token_ids) + +class TgiBackend(Backend): + def get_server_metrics(self) -> List[str]: + return [""] + def get_endpoint(self) -> str: + return "" + def create_request_payload(self, + prompt: str, + prompt_len: int, + output_len: int, + best_of: int, + use_beam_search: bool, + top_k: int, + tokenizer: PreTrainedTokenizerBase, + sax_model: str, + model: str, + streaming: bool): + return { + "inputs": prompt, + "parameters": { "best_of": best_of, "max_new_tokens": output_len, "do_sample": True, + }, } - pload = { - "inputs": prompt, - "parameters": params, - } - elif backend == "naive_transformers": - # If max_length or top_k is not specified _MAX_LENGTH_DEFAULT = 200 and - # _TOP_K_DEFAULT = 10 in peft/handler.py will be used. - pload = { + def get_response_length( + self, + request_len: int, + response: Dict, + tokenizer: PreTrainedTokenizerBase): + output_token_ids = tokenizer(response["generated_text"]).input_ids + return len(output_token_ids) + +class NaiveTransformersBackend(Backend): + def get_server_metrics(self) -> List[str]: + return [""] + def get_endpoint(self) -> str: + return "" + def create_request_payload(self, + prompt: str, + prompt_len: int, + output_len: int, + best_of: int, + use_beam_search: bool, + top_k: int, + tokenizer: PreTrainedTokenizerBase, + sax_model: str, + model: str, + streaming: bool): + return { "instances": [{ "prompt": prompt, "max_length": output_len, "top_k": top_k, }] } - elif backend == "tensorrt_llm_triton": - pload = { + def get_response_length( + self, + request_len: int, + response: Dict, + tokenizer: PreTrainedTokenizerBase): + complete_pred = response["predictions"][0][0]["generated_text"] + new_text_start_index = complete_pred.find(NEW_TEXT_KEY) + len(NEW_TEXT_KEY) + pred = complete_pred[new_text_start_index:] + output_token_ids = tokenizer(pred).input_ids + return len(output_token_ids) - request_len + +class TensorrtLlmTritonBackend(Backend): + def get_server_metrics(self) -> List[str]: + return [""] + def get_endpoint(self) -> str: + return "" + def create_request_payload(self, + prompt: str, + prompt_len: int, + output_len: int, + best_of: int, + use_beam_search: bool, + top_k: int, + tokenizer: PreTrainedTokenizerBase, + sax_model: str, + model: str, + streaming: bool): + return { "text_input": prompt, "max_tokens": output_len, "beam_width": 1 if not use_beam_search else best_of, @@ -283,10 +398,33 @@ async def send_request( "top_p": 1.0, "bad_words": "", "stop_words": "", - "stream": False, + "stream": streaming, } - elif backend == "sax": - pload = { + def get_response_length( + self, + request_len: int, + response: Dict, + tokenizer: PreTrainedTokenizerBase): + output_token_ids = tokenizer(response["text_output"]).input_ids + return len(output_token_ids) + +class SaxBackend(Backend): + def get_server_metrics(self) -> List[str]: + return [""] + def get_endpoint(self) -> str: + return "" + def create_request_payload(self, + prompt: str, + prompt_len: int, + output_len: int, + best_of: int, + use_beam_search: bool, + top_k: int, + tokenizer: PreTrainedTokenizerBase, + sax_model: str, + model: str, + streaming: bool): + return { "model": sax_model, "prompt": prompt, "n": 1, @@ -296,126 +434,513 @@ async def send_request( "top_p": 1.0, "top_k": 50, "max_tokens": output_len, - "stream": False, + "stream": streaming, } - elif backend == "jetstream": - pload = { - "prompt": prompt, - "max_tokens": output_len, + def get_response_length( + self, + request_len: int, + response: Dict, + tokenizer: PreTrainedTokenizerBase): + output_token_ids = tokenizer(response["choices"][0]["text"]).input_ids + return len(output_token_ids) + +class BenchmarkConfig(TypedDict): + model: str + model_server: str + start_time: float + +class MetricSummary(TypedDict, total=False): + json_field_name: Optional[str] + name: str + description: str + mean: float + median: Optional[float] + sd: Optional[float] + min: Optional[float] + max: Optional[float] + p90: Optional[float] + p99: Optional[float] + +class BenchmarkingStageReport(TypedDict): + """Result for one stage""" + request_rate: float + timestamp_start: float + timestamp_end: float + num_prompts_attempted: int + latencies: List + ttfts: List[float] + local_metrics: List[MetricSummary] + server_metrics: Optional[List[MetricSummary]] + errors: ErrorsReport + +class BenchmarkingReport(): + """Results for all stages for a single model""" + args: argparse.Namespace + config: BenchmarkConfig + stages: List[BenchmarkingStageReport] + + def __init__(self, args : argparse.Namespace, model: str, start_time: float): + self.args = args + self.config = BenchmarkConfig( + model = model, + model_server = args.backend, + start_time = start_time + ) + self.stages = [] + + def record_metrics_for_stage( + self, + request_rate: float, + timestamp_start: float, + timestamp_end: float, + num_prompts_attempted : int, + latencies: List, + ttfts: List[float], + errors: ErrorsReport, + backend: Backend, + ): + + def fetch_metrics_from_gmp(backend: Backend, duration: float) -> List[MetricSummary]: + """Gets summaries for metrics queried from GMP, queries vary per model server""" + + # Creates a credentials object from the default service account file + # Assumes that script has appropriate default credentials set up, ref: + # https://googleapis.dev/python/google-auth/latest/user-guide.html#application-default-credentials + credentials, project_id = google.auth.default() + # Prepare an authentication request - helps format the request auth token + auth_req = google.auth.transport.requests.Request() + + # Request refresh tokens + credentials.refresh(auth_req) + url='https://monitoring.googleapis.com/v1/projects/%s/location/global/prometheus/api/v1/metadata' % (project_id) + headers_api = {'Authorization': 'Bearer ' + credentials.token} + request_post = requests.get(url=url, headers=headers_api) + all_metrics_metadata = request_post.json() + if request_post.ok is not True: + print("HTTP Error: %s" % (all_metrics_metadata)) + return [] + if all_metrics_metadata["status"] != "success": + print("Metadata error response: %s" % all_metrics_metadata["error"]) + return [] + + metrics_list : List[MetricSummary] = [] + for metric in backend.get_server_metrics(): + + # Find metric type + metric_type = all_metrics_metadata['data'][metric] + if all_metrics_metadata['data'][metric] is None: + print("No metric found for: %s" % metric) + return [] + metric_type = metric_type[0]['type'] + + metric_results = {} + # Queries scrape all metrics collected from the last $DURATION seconds from the backend's related + # podmonitoring spec assumed to be named "$BACKEND-podmonitoring" + queries = { + "gauge": { + "Mean": "avg_over_time(%s{job='%s-podmonitoring'}[%.0fs])" % (metric, self.args.backend, duration), + "Median": "quantile_over_time(0.5, %s{job='%s-podmonitoring'}[%.0fs])" % (metric, self.args.backend, duration), + "Sd": "stddev_over_time(%s{job='%s-podmonitoring'}[%.0fs])" % (metric, self.args.backend, duration), + "Min": "min_over_time(%s{job='%s-podmonitoring'}[%.0fs])" % (metric, self.args.backend, duration), + "Max": "max_over_time(%s{job='%s-podmonitoring'}[%.0fs])" % (metric, self.args.backend, duration), + "P90": "quantile_over_time(0.9, %s{job='%s-podmonitoring'}[%.0fs])" % (metric, self.args.backend, duration), + "P99": "quantile_over_time(0.99, %s{job='%s-podmonitoring'}[%.0fs])" % (metric, self.args.backend, duration), + }, + "histogram": { + "Mean": "sum(rate(%s_sum{job='%s-podmonitoring'}[%.0fs])) / sum(rate(%s_count{job='%s-podmonitoring'}[%.0fs]))" % (metric, self.args.backend, duration, metric, self.args.backend, duration), + "Median": "histogram_quantile(0.5, sum(rate(%s_bucket{job='%s-podmonitoring'}[%.0fs])) by (le))" % (metric, self.args.backend, duration), + "Min": "histogram_quantile(0, sum(rate(%s_bucket{job='%s-podmonitoring'}[%.0fs])) by (le))" % (metric, self.args.backend, duration), + "Max": "histogram_quantile(1, sum(rate(%s_bucket{job='%s-podmonitoring'}[%.0fs])) by (le))" % (metric, self.args.backend, duration), + "P90": "histogram_quantile(0.9, sum(rate(%s_bucket{job='%s-podmonitoring'}[%.0fs])) by (le))" % (metric, self.args.backend, duration), + "P99": "histogram_quantile(0.99, sum(rate(%s_bucket{job='%s-podmonitoring'}[%.0fs])) by (le))" % (metric, self.args.backend, duration), + } + } + + metric_data : MetricSummary = { + "name": metric, + "description": f"Metrics for {metric} from {self.args.backend} backend", + } + for query_name, query in queries[metric_type].items(): + + # Configure respective query + url = f'https://monitoring.googleapis.com/v1/projects/{project_id}/location/global/prometheus/api/v1/query' + headers_api = {'Authorization': f'Bearer {credentials.token}'} + params = {'query': query} + + request_post = requests.get(url=url, headers=headers_api, params=params) + response = request_post.json() + + # handle response + if request_post.ok: + if response["status"] == "success": + metric_results[query_name] = float(response["data"]["result"][0]["value"][1]) + else: + print("Cloud Monitoring PromQL Error: %s" % (response["error"])) + else: + print("HTTP Error: %s" % (response)) + + # Handle response + if request_post.ok and response["status"] == "success": + result_value = float(response["data"]["result"][0]["value"][1]) + if query_name == "Mean": + metric_data["mean"] = result_value + elif query_name == "Median": + metric_data["median"] = result_value + elif query_name == "Sd": + metric_data["sd"] = result_value + elif query_name == "Min": + metric_data["min"] = result_value + elif query_name == "Max": + metric_data["max"] = result_value + elif query_name == "P90": + metric_data["p90"] = result_value + elif query_name == "P99": + metric_data["p99"] = result_value + else: + error_message = response.get("error", "HTTP Error") + print(f"Error fetching {query_name} for {metric}: {error_message}") + + metrics_list.append(metric_data) + return metrics_list + + def metric_sumamry_from_points(name: str, description: str, points : List[float], json_field_name: Optional[str] = None) -> MetricSummary: + mean = np.mean(points) if points else 0 + median = np.median(points) if points else 0 + sd = np.std(points) if points else 0 + min = np.min(points) if points else 0 + max = np.max(points) if points else 0 + p90 = np.percentile(points, 90) if points else 0 + p99 = np.percentile(points, 99) if points else 0 + + return MetricSummary( + json_field_name = json_field_name if json_field_name is not None else name, + name = name, + description = description, + mean = float(mean), + median = float(median), + sd = float(sd), + min = float(min), + max = float(max), + p90 = float(p90), + p99 = float(p99) + ) + + total_time = (timestamp_end - timestamp_start)/ NS_IN_SEC + if self.args.scrape_server_metrics: + server_metrics = fetch_metrics_from_gmp(backend, total_time) + + local_metrics = [ + metric_sumamry_from_points( + name="per_token_latency", + description="seconds/token (includes waiting time on server)", + points=[latency / (prompt_len + output_len) for prompt_len, output_len, latency in latencies]), + metric_sumamry_from_points( + json_field_name="request_latency", + name="latency", + description="milliseconds/request (includes waiting time on server)" , + points=[1000 * latency for _, _, latency in latencies]), + metric_sumamry_from_points( + json_field_name="tpot", + name="per_output_token_latency", + description="milliseconds/output_token (includes waiting time on server)", + points=[1000 * latency / output_len for _, output_len, latency in latencies]), + + metric_sumamry_from_points( + name="input_length", + description="length of prompt", + points=[float(prompt_len) for prompt_len, _, _ in latencies]), + metric_sumamry_from_points( + name="output_length", + description="length of response", + points=[float(output_len) for _, output_len, _ in latencies]), + MetricSummary( + name = "throughput", + description = "throughput in requests per second", + mean = (len(latencies) / ((timestamp_end - timestamp_start) / NS_IN_SEC)), + ), + ] + if self.args.stream_request: + local_metrics.append(metric_sumamry_from_points( + json_field_name="ttft", + name="time_to_first_token", + description="Time to First Token (s)", + points=ttfts) + ) + + self.stages.append(BenchmarkingStageReport( + request_rate = request_rate, + timestamp_start = timestamp_start, + timestamp_end = timestamp_end, + num_prompts_attempted = num_prompts_attempted, + latencies = latencies, + ttfts = ttfts, + errors = errors, + local_metrics=local_metrics, + server_metrics = server_metrics + )) + + def to_text_reports(self, write_to_files: bool = False) -> List[str]: + """Each element in the output list is a report for each stage""" + + output : Dict[str, str] = {} + required_stats = ["latency", "throughput", "input_length", "output_length", "per_output_token_latency"] + for stage in self.stages: + if not all(required_stat in [metric['name'] for metric in stage['local_metrics']] for required_stat in required_stats): + raise Exception(f"All of the following stats must be recorded: {required_stats}") + + for stage in self.stages: + stage_output : List[str] = [] + total_time = (stage['timestamp_end'] - stage['timestamp_start']) / NS_IN_SEC + total_output_tokens = np.sum([output_len for _, output_len, _ in stage['latencies']]) + output_tokens_per_second = total_output_tokens / total_time + output_tokens_per_min = 60 * output_tokens_per_second + + total_input_tokens = np.sum([prompt_len for prompt_len, _, _ in stage['latencies']]) + input_tokens_per_min = 60 * total_input_tokens / total_time + + total_tokens = total_input_tokens + total_output_tokens + tokens_per_min = 60 * total_tokens / total_time + stage_output.append(f"====Result for Model: {self.config['model']}====") + stage_output.append(f"Errors: {stage['errors'].to_dict()}") + stage_output.append(f"Total time: {total_time:.2f} s") + stage_output.append(f"Successful/total requests: {len(stage['latencies'])}/{stage['num_prompts_attempted']}") + stage_output.append(f"Requests/min: {60 * stage['num_prompts_attempted'] / total_time:.2f}") + stage_output.append(f"Output_tokens/min: {output_tokens_per_min:.2f}") + stage_output.append(f"Input_tokens/min: {input_tokens_per_min:.2f}") + stage_output.append(f"Tokens/min: {tokens_per_min:.2f}") + + if self.args.machine_cost: + stage_output.append( + f"Cost $/1k tokens: {self.args.machine_cost * 1000 / (60 * output_tokens_per_min)}" + ) + for metric in stage['local_metrics']: + stage_output.append(f"Average {metric['description']}:" f" {metric['mean']:.2f}") + output_filename = f"latency-profile-{datetime.fromtimestamp(stage['timestamp_start'] / NS_IN_SEC).strftime('%Y-%m-%d_%H-%M-%S')}.txt" + output[output_filename] = '\n'.join(stage_output) + if write_to_files: + with open(output_filename, 'w') as file: + file.write(output[output_filename]) + if gcs_bucket is not None: + gcs_bucket.blob(f"{args.output_bucket_filepath}/{output_filename}").upload_from_filename(output_filename) + print(f"File {output_filename} uploaded to gs://{args.output_bucket}/{args.output_bucket_filepath}") + return list(output.values()) + + # The output is a a single json summary of all stages + def to_json_report(self, write_to_file: bool = False) -> Dict: + output = { + "config": { + **self.config, + "num_models": len(self.args.models) if self.args.save_aggregated_result else 1, + "start_time": { + "seconds" : self.stages[0]["timestamp_start"] // NS_IN_SEC, + "nanos" : self.stages[0]["timestamp_start"] % NS_IN_SEC, + }, + }, + "summary_stats": { + "stats": [ + { + "request_rate": stage["request_rate"], + **{(metric["json_field_name"] if "json_field_name" in metric else metric["name"]): { + stat: value + for stat, value in metric.items() + if stat not in ["name", "description", "json_field_name"] and value is not None + } + for metric in stage["local_metrics"] + }, + "model_server_metrics": [ + {"name": server_metric["name"], **server_metric} + for server_metric in stage["server_metrics"] + ] if stage["server_metrics"] is not None else [] + } + for stage in self.stages + ] + }, + + # Legacy use case, use config if possible + "dimensions": { + "date": self.args.start_datetime.strftime('%Y%m%d-%H%M%S'), + "backend": self.args.backend, + "model_id": self.config['model'], + "tokenizer_id": self.args.tokenizer, + } if len(self.args.models.split(',')) == 1 else None, + # Legacy use case, use summary_stats if possible + "metrics": { + # Traffic metrics + "num_prompts": self.stages[0]['num_prompts_attempted'], + "request_rate": self.stages[0]['request_rate'], + "benchmark_time": (self.stages[0]['timestamp_end'] - self.stages[0]['timestamp_start']) / NS_IN_SEC, + "throughput_rps": (len(self.stages[0]['latencies']) / ((self.stages[0]['timestamp_end'] - self.stages[0]['timestamp_start']) / NS_IN_SEC)), + "throughput": np.sum([output_len for _, output_len, _ in self.stages[0]['latencies']]) / ((self.stages[0]['timestamp_end'] - self.stages[0]['timestamp_start']) / NS_IN_SEC), + **{ + f"{'avg' if stat == 'mean' else stat}_{metric['name']}": value + for metric in self.stages[0]["local_metrics"] + if "json_field_name" in metric + for stat, value in metric.items() + if stat not in ["name", "description", "json_field_name"] and value is not None + }, + "server_metrics": { + server_metric["name"]: {k.capitalize(): v for k, v in server_metric.items() if k != "name"} + for server_metric in self.stages[0]["server_metrics"] + } if self.stages[0]["server_metrics"] is not None else {} + } if len(self.stages) == 1 else None } - else: - raise ValueError(f"Unknown backend: {backend}") + + if write_to_file: + model_without_slash = self.config['model'].replace("/","-") + file_name = ( + f"{self.args.file_prefix}-{self.args.backend}-{self.args.start_datetime.strftime('%Y%m%d-%H%M%S')}-{model_without_slash}.json" + ) + with open(file_name, "w", encoding="utf-8") as outfile: + json.dump(output, outfile) + if gcs_bucket is not None: + gcs_bucket.blob(f"{self.args.output_bucket_filepath}/{file_name}").upload_from_filename(file_name) + print(f"File {file_name} uploaded to gs://{self.args.output_bucket}/{self.args.output_bucket_filepath}") + return output - # Set client timeout to be 3 hrs. - timeout = aiohttp.ClientTimeout(total=CLIENT_TIMEOUT_SEC) - async with aiohttp.ClientSession(timeout=timeout,trust_env=True,trace_configs=[trace_config]) as session: - while True: - try: - async with session.post(api_url, headers=headers, json=pload, ssl=False) as response: - output = await response.json() - - # Re-send the request if it failed. - if "error" not in output: - break - except aiohttp.client_exceptions.ClientConnectorError as client_err: - errors["ClientConnectorError"] += 1 - print(f"ClientConnectorError: {client_err}") - return None, None, errors - except asyncio.TimeoutError as timeout_err: - errors["TimeoutError"] += 1 - print(f"TimeoutError: {timeout_err}") - return None, None, errors - except aiohttp.client_exceptions.ClientOSError as e: - errors["ClientOSError"] += 1 - print(f"ClientOSError: {e}") - return None, None, errors - except aiohttp.client_exceptions.ContentTypeError as e: - print(f"ContentTypeError: {e}, response: {response}") - errors["ContentTypeError"] += 1 - return None, None, errors - except aiohttp.client_exceptions.ServerDisconnectedError as e: - errors["ServerDisconnectedError"] += 1 - print(f"ServerDisconnectedError: {e}") - return None, None, errors - except Exception as e: - print(f"Unknown error {e}") - errors["unknown_error"] += 1 - return None, None, errors - - request_end_time = time.time() - # Naive HF transformers generation and TensorRT-LLM generation stops at EOS - # tokens and the generation may be shorter than the ground-truth output - # sequence length. - if backend == "naive_transformers": - complete_pred = output["predictions"][0][0]["generated_text"] - new_text_start_index = complete_pred.find(NEW_TEXT_KEY) + len(NEW_TEXT_KEY) - pred = complete_pred[new_text_start_index:] - output_token_ids = tokenizer(pred).input_ids - output_len = len(output_token_ids) - prompt_len + +def get_backend(backend: str) -> Backend: + if backend == "vllm": + return vLLMBackend() + elif backend == "tgi": + return TgiBackend() + elif backend == "naive_transformers": + return NaiveTransformersBackend() elif backend == "tensorrt_llm_triton": - output_token_ids = tokenizer(output["text_output"]).input_ids - output_len = len(output_token_ids) + return TensorrtLlmTritonBackend() elif backend == "sax": - output_token_ids = tokenizer(output["choices"][0]["text"]).input_ids - output_len = len(output_token_ids) - elif backend == "tgi": - output_token_ids = tokenizer(output["generated_text"]).input_ids - output_len = len(output_token_ids) - elif backend == "vllm": - output_token_ids = tokenizer(output["choices"][0]["text"]).input_ids - output_len = len(output_token_ids) + return SaxBackend() elif backend == "jetstream": - output_token_ids = tokenizer(output["response"]).input_ids - output_len = len(output_token_ids) + return JetstreamBackend() + else: + raise ValueError("Unsupported backend") + +async def generate_next_request( + input_requests: List[Tuple[str, int, int]], + request_rate_expr: str, + start_time: float, +) -> AsyncGenerator[Tuple[str, int, int], None]: + """Gets request async.""" + request = random.choice(input_requests) + while True: + yield request + + if request_rate_expr == "oo": + # If the request rate is infinity, then we don't need to wait. + continue + + # Evaluate the request rate at this point in time + t = symbols('t') + expr_parsed = parse_expr(request_rate_expr, transformations="all", local_dict={"t": t}) + request_rate_at_t = expr_parsed.subs(t, ((time.time_ns() - start_time) / NS_IN_SEC)) + + # Sample the request interval from the exponential distribution. + interval = np.random.exponential(1.0 / request_rate_at_t) + # The next request will be sent after the interval. + await asyncio.sleep(interval) - # (prompt len, output len, latency, success) - request_latency = (prompt_len, output_len, (request_end_time - request_start_time)) - tpot_metric.observe((request_end_time - request_start_time) / output_len) - prompt_length_metric.observe(prompt_len) - response_length_metric.observe(output_len) +def get_filtered_dataset( + dataset_path: str, + max_input_len: int, + max_output_len: int, + tokenizer: PreTrainedTokenizerBase, + use_dummy_text: bool, +) -> List[Tuple[str, int, int]]: + """Gets a subset of the dataset where all elements adhere to the specified constraints""" + if use_dummy_text: + dummy_prompt_token_ids = [0] * max_input_len + dummy_prompt = tokenizer.decode(dummy_prompt_token_ids) + return [( + dummy_prompt, + max_input_len, + max_output_len, + )] + + # Load the dataset. + with open(dataset_path) as f: + dataset = json.load(f) + # Filter out the conversations with less than 2 turns. + dataset = [data for data in dataset if len(data["conversations"]) >= 2] + # Only keep the first two turns of each conversation. + dataset = [ + (data["conversations"][0]["value"], data["conversations"][1]["value"]) + for data in dataset + ] - return request_latency, None, None + # Tokenize the prompts and completions. + prompts = [prompt for prompt, _ in dataset] + prompt_token_ids = tokenizer(prompts).input_ids + completions = [completion for _, completion in dataset] + completion_token_ids = tokenizer(completions).input_ids + tokenized_dataset = [] + for i in range(len(dataset)): + output_len = len(completion_token_ids[i]) + tokenized_dataset.append((prompts[i], prompt_token_ids[i], output_len)) + + # Filter out too long sequences. + filtered_dataset: List[Tuple[str, int, int]] = [] + for prompt, prompt_token_ids, output_len in tokenized_dataset: + prompt_len = len(prompt_token_ids) + if prompt_len < MIN_SEQ_LEN or output_len < MIN_SEQ_LEN: + # Prune too short sequences. + # This is because TGI causes errors when the input or output length + # is too short. + continue + if prompt_len > max_input_len or output_len > max_output_len: + # Prune too long sequences. + continue + filtered_dataset.append((prompt, prompt_len, output_len)) + + return filtered_dataset async def benchmark( - args: argparse.Namespace, - api_url: str, + args: argparse.Namespace, + backend: Backend, tokenizer: PreTrainedTokenizerBase, model: str, -) -> Tuple[List[Tuple[int, int, float]], List[float], Dict[str, int]]: +) -> BenchmarkingReport: """Runs benchmark with asynchronous requests.""" - input_requests = sample_requests( + input_requests = get_filtered_dataset( args.dataset, - args.num_prompts, args.max_input_length, args.max_output_length, tokenizer, args.use_dummy_text, ) - benchmark_start_time = time.time() - tasks: List[asyncio.Task] = [] - async for request in get_request(input_requests, args.request_rate): - prompt, prompt_len, output_len = request - if args.stream_request: - task = asyncio.create_task( - send_stream_request( - args.backend, - api_url, - prompt, - prompt_len, - output_len, - args.best_of, - args.use_beam_search, - args.top_k, - tokenizer, - args.sax_model, - model, - ) - ) - else: + + all_stages = {} + if args.job is not None: + all_stages = args.job + elif args.num_prompts is not None: + all_stages = { + "stages": [{ + "rate": args.request_rate, + "max_num_prompts": args.num_prompts, + }] + } + benchmark_results = BenchmarkingReport(args, model, time.time_ns()) + for index, stage in enumerate(all_stages["stages"]): + # No need to sleep before running the first stage + if args.job is not None and 'time_between_stages' in args.job and index != 0: + print(f"Sleeping for {args.job['time_between_stages']} sec...") + await asyncio.sleep(args.job["time_between_stages"]) + max_prompts = f" {stage['max_num_prompts']} requests" if 'max_num_prompts' in stage else "" + duration = f" {stage['time']} sec" if 'time' in stage else " " + print(f"Starting benchmarking{max_prompts} at {stage['rate']} requests/sec for{duration}") + + tasks: List[asyncio.Task] = [] + prompts_sent_this_stage: int = 0 + stage_start_timestamp = time.time_ns() + async for request in generate_next_request(input_requests, str(stage["rate"]), stage_start_timestamp): + # Stop conditions + if "max_num_prompts" in stage and prompts_sent_this_stage >= stage["max_num_prompts"]: + break + if "time" in stage and ((time.time_ns() - stage_start_timestamp ) / NS_IN_SEC) > stage["time"]: + break + + prompt, prompt_len, output_len = request task = asyncio.create_task( - send_request( - args.backend, - api_url, + backend.send_request( + f"http://{args.host}:{args.port}", prompt, prompt_len, output_len, @@ -425,291 +950,61 @@ async def benchmark( tokenizer, args.sax_model, model, + args.stream_request, ) ) - tasks.append(task) - results = await asyncio.gather(*tasks) - combined_latencies = [] - combined_ttfts = [] - combined_errors = init_errors_map() - for latency, ttft, errors in results: - if latency: - combined_latencies.append(latency) - if errors: - for err, count in errors.items(): - combined_errors[err] = combined_errors[err] + count - if ttft: - combined_ttfts.append(ttft) - - benchmark_duration = time.time() - benchmark_start_time - print_and_save_result(args, benchmark_duration, len(input_requests), model, combined_latencies, combined_ttfts, combined_errors) - return combined_latencies, combined_ttfts, combined_errors - -def save_json_results(args: argparse.Namespace, benchmark_result, server_metrics, model, errors): - # Setup - start_dt_proto = Timestamp() - start_dt_proto.FromDatetime(args.start_datetime) - - final_json = { - # metrics values are numerical - "metrics" : { - # Traffic - "num_prompts_attempted": benchmark_result['num_prompts_attempted'], - "num_prompts_succeeded": benchmark_result['num_prompts_succeeded'], - "request_rate": args.request_rate, - 'server_metrics': { - **server_metrics - }, - **benchmark_result, - **errors, - }, - # dimensions values are strings - "dimensions": { - "date": args.start_datetime.strftime('%Y%m%d-%H%M%S'), - "backend": args.backend, - "model_id": model, - "tokenizer_id": args.tokenizer, - **(json.loads(args.additional_metadata_metrics_to_save) if args.additional_metadata_metrics_to_save else {}) - }, - "config": { - "model": model, - "num_models": len(args.models.split(',')), - "model_server": args.backend, - "start_time": { - "seconds" : start_dt_proto.seconds, - "nanos" : start_dt_proto.nanos - } - }, - "summary_stats": { - "stats": [{ - "request_rate": args.request_rate, - "request_latency": { - "mean": benchmark_result["avg_latency"], - "median": benchmark_result["median_latency"], - "sd": benchmark_result["sd_latency"], - "min": benchmark_result["min_latency"], - "max": benchmark_result["max_latency"], - "p90": benchmark_result["p90_latency"], - "p99": benchmark_result["p99_latency"], - }, - "throughput": { - "mean": benchmark_result['throughput'] - }, - "input_length": { - "mean": benchmark_result["avg_input_len"], - "median": benchmark_result["median_input_len"], - "sd": benchmark_result["sd_input_len"], - "min": benchmark_result["min_input_len"], - "max": benchmark_result["max_input_len"], - "p90": benchmark_result["p90_input_len"], - "p99": benchmark_result["p99_input_len"], - }, - "output_length": { - "mean": benchmark_result["avg_output_len"], - "median": benchmark_result["median_output_len"], - "sd": benchmark_result["sd_output_len"], - "min": benchmark_result["min_output_len"], - "max": benchmark_result["max_output_len"], - "p90": benchmark_result["p90_output_len"], - "p99": benchmark_result["p99_output_len"], - }, - "tpot": { - "mean": benchmark_result["avg_per_output_token_latency"], - "median": benchmark_result["median_per_output_token_latency"], - "sd": benchmark_result["sd_per_output_token_latency"], - "min": benchmark_result["min_per_output_token_latency"], - "max": benchmark_result["max_per_output_token_latency"], - "p90": benchmark_result["p90_per_output_token_latency"], - "p99": benchmark_result["p99_per_output_token_latency"], - }, - "model_server_metrics" : [{"Name": name, **metrics} for name, metrics in server_metrics.items()] - }] - } - } + tasks.append(task) + prompts_sent_this_stage += 1 + + print("All requests sent, awaiting responses...") + results = await asyncio.gather(*tasks) + stage_end_timestamp = time.time_ns() + print(f"Finished benchmarking stage {index + 1}") + + all_latencies = [] + all_ttfts = [] + all_errors = ErrorsReport() + for latency, ttft, errors in results: + if latency: + all_latencies.append(latency) + if errors: + all_errors.append_report(errors) + if ttft: + all_ttfts.append(ttft) + benchmark_results.record_metrics_for_stage(stage['rate'], stage_start_timestamp, stage_end_timestamp, prompts_sent_this_stage, all_latencies, all_ttfts, all_errors, backend) - # Save to file - model_without_slash = model.replace("/","-") - file_name = ( - f"{args.file_prefix}-{args.backend}-{args.request_rate}qps-{args.start_datetime.strftime('%Y%m%d-%H%M%S')}-{model_without_slash}.json" - ) - with open(file_name, "w", encoding="utf-8") as outfile: - json.dump(final_json, outfile) - if gcs_bucket is not None: - gcs_bucket.blob(f"{args.output_bucket_filepath}/{file_name}").upload_from_filename(file_name) - print(f"File {file_name} uploaded to gs://{args.output_bucket}/{args.output_bucket_filepath}") - -def metrics_to_scrape(backend: str) -> List[str]: - # Each key in the map is a metric, it has a corresponding 'stats' object - # It must be populated on the outputs 'metrics' field as 'key':'stats' - # If a value is specified for a given key, it will be populated on the outputs `summary_stats.stats` field as 'value':'stats' as well. - if backend == "vllm": - return ["vllm:gpu_cache_usage_perc", "vllm:num_requests_waiting"] - elif backend == "jetstream": - return [ - "jetstream_slots_used_percentage", - "jetstream_prefill_backlog_size", - ] - else: - return [] - -def print_metrics(metrics: List[str], duration: float, backend: str): - # Creates a credentials object from the default service account file - # Assumes that script has appropriate default credentials set up, ref: - # https://googleapis.dev/python/google-auth/latest/user-guide.html#application-default-credentials - credentials, project_id = google.auth.default() - # Prepare an authentication request - helps format the request auth token - auth_req = google.auth.transport.requests.Request() - - server_metrics = {} - - # Request refresh tokens - credentials.refresh(auth_req) - url='https://monitoring.googleapis.com/v1/projects/%s/location/global/prometheus/api/v1/metadata' % (project_id) - headers_api = {'Authorization': 'Bearer ' + credentials.token} - request_post = requests.get(url=url, headers=headers_api) - all_metrics_metadata = request_post.json() - if request_post.ok is not True: - print("HTTP Error: %s" % (all_metrics_metadata)) - if all_metrics_metadata["status"] != "success": - print("Metadata error response: %s" % all_metrics_metadata["error"]) - - for metric in metrics: - print("Metric Name: %s" % (metric)) - - # Find metric type - metric_type = all_metrics_metadata['data'][metric] - if all_metrics_metadata['data'][metric] is None: - print("No metric found for: %s" % metric) - return - metric_type = metric_type[0]['type'] - - metric_results = {} - # Queries scrape all metrics collected from the last $DURATION seconds from the backend's related - # podmonitoring spec assumed to be named "$BACKEND-podmonitoring" - queries = { - "gauge": { - "Mean": "avg_over_time(%s{job='%s-podmonitoring'}[%.0fs])" % (metric, backend, duration), - "Median": "quantile_over_time(0.5, %s{job='%s-podmonitoring'}[%.0fs])" % (metric, backend, duration), - "Sd": "stddev_over_time(%s{job='%s-podmonitoring'}[%.0fs])" % (metric, backend, duration), - "Min": "min_over_time(%s{job='%s-podmonitoring'}[%.0fs])" % (metric, backend, duration), - "Max": "max_over_time(%s{job='%s-podmonitoring'}[%.0fs])" % (metric, backend, duration), - "P90": "quantile_over_time(0.9, %s{job='%s-podmonitoring'}[%.0fs])" % (metric, backend, duration), - "P99": "quantile_over_time(0.99, %s{job='%s-podmonitoring'}[%.0fs])" % (metric, backend, duration), - }, - "histogram": { - "Mean": "sum(rate(%s_sum{job='%s-podmonitoring'}[%.0fs])) / sum(rate(%s_count{job='%s-podmonitoring'}[%.0fs]))" % (metric, backend, duration, metric, backend, duration), - "Median": "histogram_quantile(0.5, sum(rate(%s_bucket{job='%s-podmonitoring'}[%.0fs])) by (le))" % (metric, backend, duration), - "Min": "histogram_quantile(0, sum(rate(%s_bucket{job='%s-podmonitoring'}[%.0fs])) by (le))" % (metric, backend, duration), - "Max": "histogram_quantile(1, sum(rate(%s_bucket{job='%s-podmonitoring'}[%.0fs])) by (le))" % (metric, backend, duration), - "P90": "histogram_quantile(0.9, sum(rate(%s_bucket{job='%s-podmonitoring'}[%.0fs])) by (le))" % (metric, backend, duration), - "P99": "histogram_quantile(0.99, sum(rate(%s_bucket{job='%s-podmonitoring'}[%.0fs])) by (le))" % (metric, backend, duration), - } - } - for query_name, query in queries[metric_type].items(): - # Configure respective query - url='https://monitoring.googleapis.com/v1/projects/%s/location/global/prometheus/api/v1/query' % (project_id) - headers_api = {'Authorization': 'Bearer ' + credentials.token} - params = {'query': query} - request_post = requests.get(url=url, headers=headers_api, params=params) - response = request_post.json() - - # handle response - if request_post.ok: - if response["status"] == "success": - metric_results[query_name] = float(response["data"]["result"][0]["value"][1]) - print("%s: %s" % (query_name, response["data"]["result"][0]["value"][1])) - else: - print("Cloud Monitoring PromQL Error: %s" % (response["error"])) - else: - print("HTTP Error: %s" % (response)) - server_metrics[metric] = metric_results - return server_metrics - -def get_stats_for_set(name, description, points): - avg = np.mean(points) if points else 0 - median = np.median(points) if points else 0 - sd = np.std(points) if points else 0 - min = np.min(points) if points else 0 - max = np.max(points) if points else 0 - p90 = np.percentile(points, 90) if points else 0 - p99 = np.percentile(points, 99) if points else 0 - - print(f"Average {description}:" f" {avg:.2f}") - - return { - f'avg_{name}': avg, - f'median_{name}': median, - f'sd_{name}': sd, - f'min_{name}': min, - f'max_{name}': max, - f'p90_{name}': p90, - f'p99_{name}': p99, - } + print(f"Completed all stages, generating reports...") + return benchmark_results -def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_requests, model, request_latencies, ttfts, errors): - benchmark_result = {} - - print(f"====Result for Model: {model}====") - print(f"Errors: {errors}") - print(f"Total time: {benchmark_duration:.2f} s") - print(f"Successful/total requests: {len(request_latencies)}/{total_requests}") - print(f"Requests/min: {60 * total_requests / benchmark_duration:.2f}") - benchmark_result["num_prompts_attempted"] = total_requests - benchmark_result["num_prompts_succeeded"] = len(request_latencies) - benchmark_result['benchmark_time'] = benchmark_duration - benchmark_result['throughput_rps'] = (args.num_prompts / benchmark_duration) - - total_output_tokens = np.sum([output_len for _, output_len, _ in - request_latencies]) - output_tokens_per_second = total_output_tokens / benchmark_duration - benchmark_result['throughput'] = output_tokens_per_second - - output_tokens_per_min = 60 * output_tokens_per_second - print(f"Output_tokens/min: {output_tokens_per_min:.2f}") - benchmark_result['total_output_token'] = int(total_output_tokens) - benchmark_result['output_tokens_per_min'] = output_tokens_per_min - - total_input_tokens = np.sum([prompt_len for prompt_len, _, _ in - request_latencies]) - input_tokens_per_min = 60 * total_input_tokens / benchmark_duration - print(f"Input_tokens/min: {input_tokens_per_min:.2f}") - benchmark_result['total_input_tokens'] = int(total_input_tokens) - benchmark_result['input_tokens_per_min'] = input_tokens_per_min - - total_tokens = total_input_tokens + total_output_tokens - tokens_per_min = 60 * total_tokens / benchmark_duration - print(f"Tokens/min: {tokens_per_min:.2f}") - benchmark_result['total_tokens'] = int(total_tokens) - benchmark_result['tokens_per_min'] = tokens_per_min - ttft_stats = {} - if args.stream_request: - ttft_stats = get_stats_for_set("TTFT", "Time to First Token (s)", ttfts) - if args.machine_cost: - print( - "Cost $/1k tokens:" - f" {args.machine_cost * 1000 / (60 * output_tokens_per_min)}" - ) +def aggregate_benchmark_reports(reports: List[BenchmarkingReport]) -> BenchmarkingReport: + """When benchmarking multiple models we will generate a BenchmarkingReport for each.""" + """If `save_aggregated_result` is set, we aggregate these into a single report.""" - benchmark_result = { - **benchmark_result, - **(get_stats_for_set("per_token_latency", "seconds/token (includes waiting time on server)", [ - latency / (prompt_len + output_len) - for prompt_len, output_len, latency in request_latencies - ])), - **ttft_stats, - # NOTE: The latency below includes requests awaiting time on server side. - # It's not comparable with the model inference latency for batch size 1. - **(get_stats_for_set("latency", "milliseconds/request (includes waiting time on server)" ,[1000 * latency for _, _, latency in request_latencies])), - **(get_stats_for_set("per_output_token_latency", "milliseconds/output_token (includes waiting time on server)", [1000 * latency / output_len for _, output_len, latency in request_latencies])), - **(get_stats_for_set("input_len", "input length", [float(prompt_len) for prompt_len, _, _ in request_latencies])), - **(get_stats_for_set("output_len", "output length", [float(output_len) for _, output_len, _ in request_latencies])) + aggregated_stage_report = { + "request_rate": reports[0].stages[0]["request_rate"], + "timestamp_start": 0.0, + "timestamp_end": 0.0, + "num_prompts_attempted": 0, + "latencies": [], + "ttfts": [], + "server_metrics": [], + "errors": ErrorsReport(), } - server_metrics = {} - if args.scrape_server_metrics: - server_metrics = print_metrics(metrics_to_scrape(args.backend), benchmark_duration, args.backend) - if args.save_json_results: - save_json_results(args, benchmark_result, server_metrics, model, errors) + for report in reports: + # Input metavalidation asserts this report only has one stage report + report = report.stages[0] + aggregated_stage_report["timestamp_start"] = min(aggregated_stage_report["timestamp_start"], report["timestamp_start"]) + aggregated_stage_report["timestamp_end"] = max(aggregated_stage_report["timestamp_end"], report["timestamp_end"]) + aggregated_stage_report["num_prompts_attempted"] += report["num_prompts_attempted"] + aggregated_stage_report["latencies"].extend(report["latencies"]) + aggregated_stage_report["ttfts"].extend(report["ttfts"]) + aggregated_stage_report["errors"] = aggregated_stage_report["errors"].append_report(report["errors"]) + + aggregated_report = BenchmarkingReport(reports[0].args, f"ALL-{len(reports)}-MODELS", aggregated_stage_report["timestamp_start"]) + aggregated_report.record_metrics_for_stage(**aggregated_stage_report) + + return aggregated_report async def main(args: argparse.Namespace): print(args) @@ -717,12 +1012,7 @@ async def main(args: argparse.Namespace): print(f"Models to benchmark: {models}") random.seed(args.seed) np.random.seed(args.seed) - endpoint = ( - "v1/completions" - if args.backend == "vllm" - else args.endpoint -) - + # Create GCS client before benchmarking # Should fail fast if client is misconfigured or missing permissions if args.output_bucket is not None: @@ -739,39 +1029,37 @@ async def main(args: argparse.Namespace): print(f"Starting Prometheus Server on port {PROMETHEUS_PORT}") start_http_server(PROMETHEUS_PORT) - api_url = f"http://{args.host}:{args.port}/{endpoint}" tokenizer = AutoTokenizer.from_pretrained( args.tokenizer, trust_remote_code=args.trust_remote_code ) - - benchmark_start_time = time.time() - args.start_datetime = datetime.fromtimestamp(benchmark_start_time) - - results = await asyncio.gather( - *[benchmark(args, api_url, tokenizer, model) for model in models] - ) - - # Summarize results - combined_latencies = [] - combined_ttfts = [] - combined_errors = { - "ClientConnectorError": 0, - "TimeoutError": 0, - "ContentTypeError": 0, - "ClientOSError": 0, - "unknown_error": 0, - "ServerDisconnectedError": 0, - } - for latencies, ttfts, errors in results: - combined_latencies.extend(latencies) - combined_ttfts.extend(ttfts) - for k, v in errors.items(): - combined_errors[k] = combined_errors[k] + v + args.start_datetime = datetime.fromtimestamp(time.time_ns() / NS_IN_SEC) - benchmark_duration_all_models = time.time() - benchmark_start_time + backend: Backend = get_backend(args.backend) + reports : List[BenchmarkingReport] = await asyncio.gather( + *[benchmark(args, backend, tokenizer, model) for model in models] + ) + if args.save_aggregated_result: - print_and_save_result(args, benchmark_duration_all_models, len(models)*args.num_prompts, f"ALL-{len(models)}-MODELS", combined_latencies, combined_ttfts, combined_errors) + aggregated_benchmark = aggregate_benchmark_reports(reports) + aggregated_benchmark.to_text_reports(write_to_files=True) + aggregated_benchmark.to_json_report(write_to_file=args.save_json_results) + else: + for report in reports: + report.to_text_reports(write_to_files=True) + report.to_json_report(write_to_file=args.save_json_results) + +def input_metavalidation(args: argparse.Namespace): + """Validate a correct combination of arguments is set""" + if sum([bool(args.request_rate is not None and args.num_prompts is not None), bool(args.job is not None)]) != 1: + raise ValueError("All args must be set for one and only one of the following sets of arguments: {--request-rate, --num-prompts} or {--job}") + + if args.save_aggregated_result and args.benchmark is not None and len(args.benchmark) != 1 and args.models is not None and len(args.models) > 1: + raise ValueError("Multi model benchmarking with multi stage benchmarking is not supported yet") + + if args.use_beam_search and args.backend == "tgi": + raise ValueError("Beam search is not supported by TGI") + if __name__ == "__main__": parser = argparse.ArgumentParser( description="Benchmark the online serving throughput." @@ -796,7 +1084,6 @@ async def main(args: argparse.Namespace): help="Model name to send request to at API server for SAX model server.", ) parser.add_argument("--file-prefix", type=str, default="benchmark") - parser.add_argument("--endpoint", type=str, default="generate") parser.add_argument("--host", type=str, default="localhost") parser.add_argument("--port", type=int, default=7080) parser.add_argument("--dataset", type=str, help="Path to the dataset.") @@ -826,7 +1113,7 @@ async def main(args: argparse.Namespace): parser.add_argument( "--num-prompts", type=int, - default=1000, + default=None, help="Number of prompts to process.", ) parser.add_argument( @@ -855,17 +1142,106 @@ async def main(args: argparse.Namespace): " LLaMA2 models." ), ) + + # Input assertions + def is_expression_of_t(input_str): + if input_str == "inf": + return "oo" + # Check if expression uses variables other than 't' by attempting to evaluate with only 't' defined + try: + t = symbols('t') + expr_parsed = parse_expr(input_str, transformations="all", local_dict={"t": t}) + expr_parsed.subs(t, 1) + return input_str + except Exception: + raise ValueError(f"Request rate {input_str}, must be an expression of `t`") + parser.add_argument( "--request-rate", - type=float, - default=float("inf"), + type=is_expression_of_t, + default=None, + help=( + "Specifies the request rate as a function of time, f(t)." + " Example format: '1+1.05*t', where 't' represents seconds from" + " start. If set to 'inf', all requests are sent at time 0." + " Otherwise, the function is interpreted to generate a Poisson" + " process for request arrival times based on the provided rate" + " expression." + ), + ) + + def parse_request_rates(input_str): + if input_str is None: + return None + # Check if input is a filename and load its contents + if os.path.isfile(input_str): + with open(input_str, 'r') as file: + input_str = file.read() + try: + # Parse the input string as JSON + request_data = json.loads(input_str) + # Validate that the JSON has the correct structure + if not isinstance(request_data, dict): + raise argparse.ArgumentTypeError("Input JSON must be an object containing 'time_between_stages' and 'stages'.") + # Check 'time_between_stages' field + if "time_between_stages" not in request_data or (not isinstance(request_data["time_between_stages"], float) and not isinstance(request_data["time_between_stages"], int)): + raise argparse.ArgumentTypeError("'time_between_stages' must be a float or int.") + # Check 'stages' field + if "stages" not in request_data or not isinstance(request_data["stages"], list): + raise argparse.ArgumentTypeError("'stages' must be a list of objects with 'rate' and 'time'.") + + # Validate each entry in the 'stages' list + for i, rate_entry in enumerate(request_data["stages"]): + if not isinstance(rate_entry, dict): + raise argparse.ArgumentTypeError(f"Entry {i} in 'stages' must be a JSON object.") + + if "rate" not in rate_entry: + raise argparse.ArgumentTypeError(f"Entry {i} in 'stages' must have a 'rate' key.") + if "time" not in rate_entry and "max_num_prompts" not in rate_entry: + raise argparse.ArgumentTypeError(f"Entry {i} in 'stages' must have a 'time' and/or 'max_num_prompts' key.") + + # Validate the 'rate' field to allow for string expressions or floats + if isinstance(rate_entry["rate"], str): + try: + is_expression_of_t(rate_entry["rate"]) # Validate the expression + except Exception as e: + raise argparse.ArgumentTypeError(f"Entry {i} in 'stages': {e}") + # Validate the 'time' field + if not isinstance(rate_entry["time"], (float, int)): + raise argparse.ArgumentTypeError(f"Entry {i} in 'stages': 'time' must be a positive float.") + return request_data + except json.JSONDecodeError as e: + raise argparse.ArgumentTypeError("Invalid JSON format") + + parser.add_argument( + "--job", + type=parse_request_rates, + default=None, + required=False, help=( - "Number of requests per second. If this is inf, " - "then all the requests are sent at time 0. " - "Otherwise, we use Poisson process to synthesize " - "the request arrival times." + "Specify the benchmark procedure in JSON format, either as raw JSON" + " or as a filename. \n" + " The JSON should have the following structure:\n\n" + " {\n" + " \"time_between_stages\": float (seconds to rest between rates),\n" + " \"rates\": [\n" + " {\n" + " \"rate\": float | str (as would be passed to request-rate),\n" + " \"time\": float (number of seconds for this stage)\n" + " \"max_num_prompts\": int (maximum number of prompts for this stage)" + " },\n" + " ...\n" + " ]\n" + " }\n\n" + " Example JSON:\n" + " '{\"time_between_stages\": 1.0, \"rates\": [{\"rate\": 2.0, \"time\": 0.0}, {\"rate\": \"1+0.5*t\", \"time\": 5.0}]}'\n\n" + " Each entry should have a 'rate' and/or 'num_prompts' and 'time' value." + " Each rate is finished when \"num_prompts\" prompts are sent" + " (if specified) and \"time\" seconds have passed (if specified)," + " whichever comes last" ), ) + parser.add_argument("--seed", type=int, default=int(time.time())) parser.add_argument( "--trust-remote-code", @@ -931,4 +1307,5 @@ async def main(args: argparse.Namespace): help="Whether to scrape server metrics.", ) cmd_args = parser.parse_args() + input_metavalidation(cmd_args) asyncio.run(main(cmd_args)) \ No newline at end of file diff --git a/benchmarks/benchmark/tools/profile-generator/container/requirements.txt b/benchmarks/benchmark/tools/profile-generator/container/requirements.txt index c3bfdaca3..8f36a3afc 100644 --- a/benchmarks/benchmark/tools/profile-generator/container/requirements.txt +++ b/benchmarks/benchmark/tools/profile-generator/container/requirements.txt @@ -24,6 +24,7 @@ psutil ray >= 2.9 sentencepiece # Required for LLaMA tokenizer. numpy < 2.0 +sympy <= 1.13 torch == 2.1.1 transformers >= 4.42.0 # Required for Qwen2 xformers == 0.0.23