Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request rate function #908

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import aiohttp
import numpy as np
from sympy import symbols
from sympy.parsing.sympy_parser import parse_expr
from transformers import AutoTokenizer
from transformers import PreTrainedTokenizerBase

Expand All @@ -30,6 +32,8 @@
CLIENT_TIMEOUT_SEC = 3 * 60 * 60
NEW_TEXT_KEY = "\nOutput:\n"
PROMETHEUS_PORT = 9090
NS_IN_SEC = 1_000_000_000


# Prometheus Metrics
prompt_length_metric = Histogram("LatencyProfileGenerator:prompt_length", "Input prompt length", buckets=[2**i for i in range(1, 16)])
Expand All @@ -38,6 +42,7 @@
tpot_metric = Histogram('LatencyProfileGenerator:time_per_output_token', 'Time per output token per request (excluding first token)')
ttft_metric = Histogram('LatencyProfileGenerator:time_to_first_token', 'Time to first token per request')
active_requests_metric = Gauge('LatencyProfileGenerator:active_requests', 'How many requests actively being processed')
request_rate_metric = Gauge('LatencyProfileGenerator:request_rate', "The current request rate in seconds")

# Add trace config for monitoring in flight requests
async def on_request_start(session, trace_config_ctx, params):
Expand Down Expand Up @@ -110,18 +115,26 @@ def get_filtered_dataset(

async def generate_next_request(
input_requests: List[Tuple[str, int, int]],
request_rate: float,
request_rate_expr: float,
start_time: float,
) -> AsyncGenerator[Tuple[str, int, int], None]:
"""Gets request async."""
while True:
request = random.choice(input_requests)
yield request

if request_rate == float("inf"):
if request_rate_expr == "oo":
# If the request rate is infinity, then we don't need to wait.
continue

# Evaluate the request rate at this point in time
t = symbols('t')
expr_parsed = parse_expr(request_rate_expr, transformations="all", local_dict={"t": t})
request_rate_at_t = expr_parsed.subs(t, ((time.time_ns() - start_time) / NS_IN_SEC))
request_rate_metric.set(request_rate_at_t)

# Sample the request interval from the exponential distribution.
interval = np.random.exponential(1.0 / request_rate)
interval = np.random.exponential(1.0 / request_rate_at_t)
# The next request will be sent after the interval.
await asyncio.sleep(interval)

Expand Down Expand Up @@ -408,7 +421,7 @@ async def benchmark(
benchmark_start_time = time.time()
tasks: List[asyncio.Task] = []
prompts_sent: int = 0
async for request in generate_next_request(input_requests, args.request_rate):
async for request in generate_next_request(input_requests, args.request_rate, time.time_ns()):
if args.num_prompts <= prompts_sent:
break
prompt, prompt_len, output_len = request
Expand Down Expand Up @@ -548,7 +561,7 @@ def save_json_results(args: argparse.Namespace, benchmark_result, server_metrics
# Save to file
model_without_slash = model.replace("/","-")
file_name = (
f"{args.file_prefix}-{args.backend}-{args.request_rate}qps-{args.start_datetime.strftime('%Y%m%d-%H%M%S')}-{model_without_slash}.json"
f"{args.file_prefix}-{args.backend}-{args.start_datetime.strftime('%Y%m%d-%H%M%S')}-{model_without_slash}.json"
)
with open(file_name, "w", encoding="utf-8") as outfile:
json.dump(final_json, outfile)
Expand Down Expand Up @@ -876,16 +889,32 @@ async def main(args: argparse.Namespace):
" LLaMA2 models."
),
)
# Input assertions
def is_expression_of_t(input_str):
if input_str == "inf":
return "oo"
# Check if expression uses variables other than 't' by attempting to evaluate with only 't' defined
try:
t = symbols('t')
expr_parsed = parse_expr(input_str, transformations="all", local_dict={"t": t})
expr_parsed.subs(t, 1)
return input_str
except Exception:
raise ValueError(f"Request rate {input_str}, must be numeric or an expression of `t`")

parser.add_argument(
"--request-rate",
type=float,
default=float("inf"),
type=is_expression_of_t,
default="inf",
help=(
"Number of requests per second. If this is inf, "
"then all the requests are sent at time 0. "
"Otherwise, we use Poisson process to synthesize "
"the request arrival times."
),
"Specifies the request rate as a function of time, f(t)."
" Example format: '1+1.05*t', where 't' represents seconds from"
" start. If set to 'inf', all requests are sent at time 0."
" Otherwise, the function is interpreted to generate a Poisson"
" process for request arrival times based on the provided rate"
" expression. Current value emitted as the following prometheus"
" metric: 'LatencyProfileGenerator:request_rate'"
),
)
parser.add_argument("--seed", type=int, default=int(time.time()))
parser.add_argument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ accelerate
aiohttp
google-auth
google-cloud-storage >= 2.18.2
prometheus_client >= 0.21.0
prometheus_client >= 0.21.0
sympy >= 1.13
Loading