Skip to content

Commit

Permalink
Increase locust user task and connection timeout (#341)
Browse files Browse the repository at this point in the history
Increase locust user task and connection timeout to handle long-running requests.
  • Loading branch information
annapendleton authored Mar 13, 2024
1 parent 169a45b commit fe3f86b
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ LOCUS_OPTS="-f /locust-tasks/tasks.py --host=$TARGET_HOST"
LOCUST_MODE=${LOCUST_MODE:-standalone}

if [[ "$LOCUST_MODE" = "master" ]]; then
LOCUS_OPTS="$LOCUS_OPTS --master --stop-timeout 300"
# Locust stop-timeout default is 0s. Only used in distributed mode.
# Master will wait $stop-timout amount of time for the User to complete it's task.
# For inferencing workloads with large payload having no wait time is unreasonable.
# This timeout is set to large amount to avoid user tasks being killed too early.
# TODO: turn timeout into a variable.
LOCUS_OPTS="$LOCUS_OPTS --master --stop-timeout 10800"
elif [[ "$LOCUST_MODE" = "worker" ]]; then
huggingface-cli login --token $HUGGINGFACE_TOKEN
FILTER_PROMPTS="python /locust-tasks/load_data.py"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

logging.basicConfig(level=logging.INFO)


def load_test_prompts():
"""Loads test prompts from a local file location."""
with open("locust-tasks/filtered_prompts.txt") as f:
Expand Down Expand Up @@ -99,39 +100,43 @@ def generate_request(prompt):
raise ValueError(f"Unknown backend: {backend}")
return pload


def get_token_count(prompt, resp):
"""Get number of tokens to prompt and resp using the tokenizer"""
global tokenizer
backend = model_params["backend"]

number_of_input_tokens = len(tokenizer.encode(prompt))
number_of_output_tokens = 0

if backend == "vllm":
number_of_output_tokens = 0 # to be added
number_of_output_tokens = 0 # to be added
elif backend == "tgi":
number_of_output_tokens = 0 # to be added
number_of_output_tokens = 0 # to be added
elif backend == "tensorrt_llm_triton":
resp_dict = json.loads(resp.content.decode('utf-8'))
number_of_output_tokens = len(tokenizer.encode(resp_dict['text_output']))
number_of_output_tokens = len(
tokenizer.encode(resp_dict['text_output']))
elif backend == "sax":
number_of_output_tokens = 0 # to be added
number_of_output_tokens = 0 # to be added
else:
raise ValueError(f"Unknown backend: {backend}")
return number_of_input_tokens, number_of_output_tokens



class BenchmarkUser(FastHttpUser):
weight = 1
# Connection_timeout default is 60s. For inferencing workloads with a large payload
# this timeout can be too short. Increasing timeout to large amount.
# TODO: turn timeout into a variable.
connection_timeout = 10800

@task
def lm_generate(self):
global test_data
global model_params
global tokenizer


if not test_data:
logging.error("No test data configured.")
logging.error("Stopping the runner")
Expand All @@ -146,21 +151,25 @@ def lm_generate(self):
test_start_time = time.time()
with self.client.post("/generate", headers=headers, json=request, catch_response=True) as resp:
if resp.status_code == 200:
self.handle_successful_response(prompt, resp, test_start_time)
self.handle_successful_response(prompt, resp, test_start_time)
else:
if resp.status_code == 0:
logging.error(
f"Failed request with invalid response code: {resp.status_code}. Due to requests.RequestException thrown by Session, caused by connection errors, timeouts or similar. Try increasing connection_timeout")
self.handle_failed_response(request, resp)



def handle_successful_response(self, prompt, reponse, start_time):
global model_params
if model_params['enable_custom_metrics'] == 'true':
test_time = time.time() - start_time
request_successful_bool = 1
tokens_sent, tokens_received = get_token_count(prompt, reponse)

logging.info(f'sending to master: metric_update: {[tokens_sent, tokens_received, test_time, request_successful_bool]}')
self.environment.runner.send_message("metric_update", [tokens_sent, tokens_received, test_time, request_successful_bool])


logging.info(
f'sending to master: metric_update: {[tokens_sent, tokens_received, test_time, request_successful_bool]}')
self.environment.runner.send_message("metric_update", [
tokens_sent, tokens_received, test_time, request_successful_bool])

def handle_failed_response(self, request, response):
global model_params
response.failure("Got unexpected response")
Expand All @@ -170,31 +179,40 @@ def handle_failed_response(self, request, response):
tokens_received = -1
test_time = -1
request_successful_bool = 0

logging.info(f'sending to master: metric_update: {[tokens_sent, tokens_received, test_time, request_successful_bool]}')
self.environment.runner.send_message("metric_update", [tokens_sent, tokens_received, test_time, request_successful_bool])



logging.info(
f'sending to master: metric_update: {[tokens_sent, tokens_received, test_time, request_successful_bool]}')
self.environment.runner.send_message("metric_update", [
tokens_sent, tokens_received, test_time, request_successful_bool])


"""
methods for the locust master to write custom metrics
"""
"""


def collect_metrics(msg, **_kwargs):
"""locust master collects the metrics emitted by the locust workers and updates the metric_collector object"""
sent = msg.data[0]
received = msg.data[1]
test_time = msg.data[2]
request_successful_bool = msg.data[3]
logging.info(f'recevied from worker {msg.data}')
metric_collector.add_metric(sent, received, test_time, request_successful_bool)
metric_collector.add_metric(
sent, received, test_time, request_successful_bool)


def periodically_write_metrics(environment):
metric_collector.write_to_csv()
threading.Timer(environment.parsed_options.csv_upload_frequency, periodically_write_metrics, args=(environment,)).start()
threading.Timer(environment.parsed_options.csv_upload_frequency,
periodically_write_metrics, args=(environment,)).start()


def setup_periodic_metrics_writer(environment, **_kwargs):
"""locust master periodically writes the collected metrics to csv"""
periodically_write_metrics(environment)

periodically_write_metrics(environment)


def setup_custom_route(environment, **_kwargs):
"""Sets up custom routes in the locust master for serving CSV files."""
directory = os.path.dirname('/') # Directory where the file is located
Expand All @@ -204,7 +222,8 @@ def custom_metrics(filename):
if filename not in ['custom_metrics.csv', 'custom_metrics_final.csv']:
return "File not found.", 404 # Basic validation to prevent unauthorized file access
return send_from_directory(directory, filename, as_attachment=True)



@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
"""on test stop the locust master writes the output to custom_metrics_final and resets the metric_collector for next tests"""
Expand All @@ -213,8 +232,6 @@ def on_test_stop(environment, **kwargs):
metric_collector.write_to_csv('custom_metrics_final.csv')
metric_collector.__init__()
metric_collector.write_to_csv()




@events.init_command_line_parser.add_listener
Expand Down Expand Up @@ -243,9 +260,10 @@ def _(environment, **kwargs):
global model_params
global test_data
global metric_collector
global tokenizer

tokenizer = AutoTokenizer.from_pretrained(environment.parsed_options.tokenizer)
global tokenizer

tokenizer = AutoTokenizer.from_pretrained(
environment.parsed_options.tokenizer)

logging.info(
"Loading test prompts from locust-tasks/filtered_prompts.txt.")
Expand All @@ -263,12 +281,12 @@ def _(environment, **kwargs):
"sax_model": environment.parsed_options.sax_model,
"use_beam_search": environment.parsed_options.use_beam_search,
"tokenizer": environment.parsed_options.tokenizer,
"enable_custom_metrics" : environment.parsed_options.enable_custom_metrics,
"csv_upload_frequency" : environment.parsed_options.csv_upload_frequency,
"enable_custom_metrics": environment.parsed_options.enable_custom_metrics,
"csv_upload_frequency": environment.parsed_options.csv_upload_frequency,
}
logging.info(
f"Using the following benchmark parameters:\n {model_params}")

elif environment.parsed_options.enable_custom_metrics == 'true':
# code to setup the locust master to write custom metrics
setup_periodic_metrics_writer(environment)
Expand Down

0 comments on commit fe3f86b

Please sign in to comment.