From 1e98620fe8d47c0e996c654a40047a36ae52e8c5 Mon Sep 17 00:00:00 2001 From: JP Date: Tue, 7 Nov 2023 09:56:15 +0000 Subject: [PATCH] Multiple Prompts - allows multiple prompts and output files in a single run. this saves the model loading time especially when testing multiple prompts for hf and vllm runners - we ensure that the number of prompt and output files match early in main.py since it applies to all runners --- README.md | 49 +++++++-- eval/anthropic_runner.py | 230 ++++++++++++++++++++------------------- eval/hf_runner.py | 191 ++++++++++++++++---------------- eval/openai_runner.py | 202 +++++++++++++++++----------------- eval/vllm_runner.py | 155 ++++++++++++++------------ main.py | 11 +- 6 files changed, 455 insertions(+), 383 deletions(-) diff --git a/README.md b/README.md index acc62e1..3d3da9a 100644 --- a/README.md +++ b/README.md @@ -101,13 +101,12 @@ Having implemented the query generator, the next piece of abstraction would be t ## Running the Test -### OpenAI -Remember to have your OpenAI API key (`OPENAI_API_KEY="sk-..."`) set as an environment variable before running the test if you plan to call the OpenAI API (or Anthropic/other LLM API's accordingly). +### OpenAI / Anthropic +Remember to have your API key (`OPENAI_API_KEY` or `ANTHROPIC_API_KEY`) set as an environment variable before running the test if you plan to call the OpenAI or Anthropic/other LLM API's accordingly. -To test it out with just 10 questions (instead of all 175), parallelized across 5 : +To test it out with just 10 questions (instead of all 175) using the gpt-3.5-turbo model, parallelized across 5 workers: ```bash -mkdir results # create directory for storing results python main.py \ -q data/questions_gen.csv \ -o results/my_query_generator.csv \ @@ -118,21 +117,53 @@ python main.py \ -p 5 ``` +To test out the full suite of questions for claude-2: +```bash +python main.py \ + -q data/questions_gen.csv \ + -o results/claude-2.csv \ + -g anthropic \ + -f prompts/prompt_anthropic.md \ + -m claude-2 +``` + ### Hugging Face To test it out with our fine-tuned sql model with just 10 questions (instead of all 175): ```bash -mkdir results #create directory for storing results - # use the -W option to ignore warnings about sequential use of transformers pipeline python -W ignore main.py \ -q data/questions_gen.csv \ -o results/results.csv \ -g hf \ -f prompts/prompt.md \ - -m defog/starcoder-finetune-v3 \ + -m defog/sqlcoder2 \ -n 10 ``` +We also support loading a peft adapter here as well via the `-a` flag. Note that the loading of the adapter with the model will take slightly longer than usual. + +### VLLM + +We also have a [vllm](vllm.ai) runner which uses the VLLM engine to run the inference altogether as a single batch. It is much faster to do so especially when `num_beams` > 1. You would have to pass in a single set of merged model weights, and the model architecture needs to be supported by vllm. Here's a sample command: +```bash +python -W ignore main.py \ + -q data/questions_gen.csv \ + -o "results/results.csv" \ + -g vllm \ + -f "prompts/prompt.md" \ + -m defog/sqlcoder2 +``` + +If you'd like to test out a few prompts in a single run (to save the few minutes spent loading the model into GPU at the start of each run), you can specify a list of prompt files in `--prompt_file` (e.g. `-f prompts/prompt-1.md prompts/prompt-2.md prompts/prompt-3.md`), as well as a corresponding list of output files in `--output_file` (e.g. `-o results/results-1.csv results/results-2.csv results/results-3.csv`). The number of prompts and output files must be the same. Here's a sample command: +```bash +python -W ignore main.py \ + -q data/questions_gen.csv \ + -o results/results_1.csv results/results_2.csv \ + -g vllm \ + -f prompts/prompt_1.md prompts/prompt_2.md \ + -m defog/sqlcoder2 +``` +While you can do the same for the other runners, the time savings are most significant when loading a large model locally, vs calling an always-on API. ### CLI Flags @@ -143,9 +174,9 @@ You can use the following flags in the command line to change the configurations | -n, --num_questions | Use this to limit the total number of questions you want to test. | | -g, --model_type | Model type used. Make sure this matches the model used. Currently defined options in `main.py` are `oa` for OpenAI models and `hf` for Hugging Face models. | | -m, --model | Model that will be tested and used to generate the queries. Currently defined options for OpenAI models are chat models `gpt-3.5-turbo-0613` and `gpt-4-0613`, and non-chat model `text-davinci-003`. For Hugging Face models, simply use the path of your chosen model (e.g. `defog/sqlcoder`). | -| -f, --prompt_file | Markdown file with the prompt used for query generation. | +| -f, --prompt_file | Markdown file with the prompt used for query generation. You can pass in a list of prompts to test sequentially without reloading the script. | | -d, --use_private_data | Use this to read from your own private data library. | -| -o, --output_file | Output CSV file that will store your results. | +| -o, --output_file | Output CSV file that will store your results. You need to pass the same number of output file paths as the number of prompt files | | -p, --parallel_threads | The default no. of parallel threads is 5. Decrease this to 1 for gpt-4 to avoid the rate limit error. Parallelization support is currently only defined for OpenAI models. | | -t, --timeout_gen | No. of seconds before timeout occurs for query generation. The default is 30.0s. | | -u, --timeout_exec | No. of seconds before timeout occurs for query execution on the database. The default is 10.0s. | diff --git a/eval/anthropic_runner.py b/eval/anthropic_runner.py index 1dfc2ce..8e299a2 100644 --- a/eval/anthropic_runner.py +++ b/eval/anthropic_runner.py @@ -1,5 +1,6 @@ from concurrent.futures import ThreadPoolExecutor, as_completed import copy +import os from eval.eval import compare_query_results import pandas as pd from psycopg2.extensions import QueryCanceledError @@ -9,85 +10,38 @@ def run_anthropic_eval(args): - print("preparing questions...") - # get questions - question_query_df = prepare_questions_df(args.questions_file, args.num_questions) - qg_class = AnthropicQueryGenerator - # add columns for generated query and metrics - question_query_df["generated_query"] = "" - question_query_df["reason"] = "" - question_query_df["error_msg"] = "" - question_query_df["exact_match"] = 0 - question_query_df["correct"] = 0 - question_query_df["error_query_gen"] = 0 - question_query_df["error_db_exec"] = 0 - question_query_df["timeout"] = 0 - # add custom metrics below: - question_query_df["latency_seconds"] = 0.0 # latency of query generation in seconds - question_query_df["tokens_used"] = 0 # number of tokens used in query generation + for prompt_file, output_file in zip(args.prompt_file, args.output_file): + print("preparing questions...") + # get questions + question_query_df = prepare_questions_df( + args.questions_file, args.num_questions + ) + qg_class = AnthropicQueryGenerator + # add columns for generated query and metrics + question_query_df["generated_query"] = "" + question_query_df["reason"] = "" + question_query_df["error_msg"] = "" + question_query_df["exact_match"] = 0 + question_query_df["correct"] = 0 + question_query_df["error_query_gen"] = 0 + question_query_df["error_db_exec"] = 0 + question_query_df["timeout"] = 0 + # add custom metrics below: + # latency of query generation in seconds + question_query_df["latency_seconds"] = 0.0 + # number of tokens used in query generation + question_query_df["tokens_used"] = 0 - question_query_df.reset_index(inplace=True, drop=True) + question_query_df.reset_index(inplace=True, drop=True) - input_rows = question_query_df.to_dict("records") - output_rows = [] - with ThreadPoolExecutor(args.parallel_threads) as executor: - # for each query in the csv, generate a query using the generator asynchronously - futures = [] - for row in input_rows: - # get db creds for each row's db_name - db_name = row["db_name"] - db_creds = { - "host": "localhost", - "port": 5432, - "user": "postgres", - "password": "postgres", - "database": db_name, - } - - qg = qg_class( - db_creds=copy.deepcopy(db_creds), - model=args.model, - prompt_file=args.prompt_file, - timeout=args.timeout_gen, - verbose=args.verbose, - ) - - generated_query_fut = executor.submit( - qg.generate_query, question=row["question"] - ) - futures.append(generated_query_fut) - - total_tried = 0 - total_correct = 0 - for f in (pbar := tqdm(as_completed(futures), total=len(futures))): - total_tried += 1 - i = futures.index(f) - row = input_rows[i] - result_dict = f.result() - query_gen = result_dict["query"] - reason = result_dict["reason"] - err = result_dict["err"] - # save custom metrics - if "latency_seconds" in result_dict: - row["latency_seconds"] = result_dict["latency_seconds"] - if "tokens_used" in result_dict: - row["tokens_used"] = result_dict["tokens_used"] - row["generated_query"] = query_gen - row["reason"] = reason - row["error_msg"] = err - # save failures into relevant columns in the dataframe - if "GENERATION ERROR" in err: - row["error_query_gen"] = 1 - elif "EXECUTION ERROR" in err: - row["error_db_exec"] = 1 - elif "TIMEOUT" in err: - row["timeout"] = 1 - else: - expected_query = row["query"] + input_rows = question_query_df.to_dict("records") + output_rows = [] + with ThreadPoolExecutor(args.parallel_threads) as executor: + # for each query in the csv, generate a query using the generator asynchronously + futures = [] + for row in input_rows: + # get db creds for each row's db_name db_name = row["db_name"] - question = row["question"] - query_category = row["query_category"] - exact_match = correct = 0 db_creds = { "host": "localhost", "port": 5432, @@ -95,39 +49,95 @@ def run_anthropic_eval(args): "password": "postgres", "database": db_name, } - # try executing the queries and compare the results if they succeed - try: - exact_match, correct = compare_query_results( - query_gold=expected_query, - query_gen=query_gen, - db_name=db_name, - db_creds=db_creds, - timeout=args.timeout_exec, - question=question, - query_category=query_category, - ) - row["exact_match"] = int(exact_match) - row["correct"] = int(correct) - row["error_msg"] = "" - if correct: - total_correct += 1 - except QueryCanceledError as e: - row["timeout"] = 1 - row["error_msg"] = f"QUERY EXECUTION TIMEOUT: {e}" - except Exception as e: + + qg = qg_class( + db_creds=copy.deepcopy(db_creds), + model=args.model, + prompt_file=prompt_file, + timeout=args.timeout_gen, + verbose=args.verbose, + ) + + generated_query_fut = executor.submit( + qg.generate_query, question=row["question"] + ) + futures.append(generated_query_fut) + + total_tried = 0 + total_correct = 0 + for f in (pbar := tqdm(as_completed(futures), total=len(futures))): + total_tried += 1 + i = futures.index(f) + row = input_rows[i] + result_dict = f.result() + query_gen = result_dict["query"] + reason = result_dict["reason"] + err = result_dict["err"] + # save custom metrics + if "latency_seconds" in result_dict: + row["latency_seconds"] = result_dict["latency_seconds"] + if "tokens_used" in result_dict: + row["tokens_used"] = result_dict["tokens_used"] + row["generated_query"] = query_gen + row["reason"] = reason + row["error_msg"] = err + # save failures into relevant columns in the dataframe + if "GENERATION ERROR" in err: + row["error_query_gen"] = 1 + elif "EXECUTION ERROR" in err: row["error_db_exec"] = 1 - row["error_msg"] = f"QUERY EXECUTION ERROR: {e}" - output_rows.append(row) - pbar.set_description( - f"Correct so far: {total_correct}/{total_tried} ({100*total_correct/total_tried:.2f}%)" - ) - output_df = pd.DataFrame(output_rows) - output_df = output_df.sort_values(by=["db_name", "query_category", "question"]) - output_df.to_csv(args.output_file, index=False, float_format="%.2f") + elif "TIMEOUT" in err: + row["timeout"] = 1 + else: + expected_query = row["query"] + db_name = row["db_name"] + question = row["question"] + query_category = row["query_category"] + exact_match = correct = 0 + db_creds = { + "host": "localhost", + "port": 5432, + "user": "postgres", + "password": "postgres", + "database": db_name, + } + # try executing the queries and compare the results if they succeed + try: + exact_match, correct = compare_query_results( + query_gold=expected_query, + query_gen=query_gen, + db_name=db_name, + db_creds=db_creds, + timeout=args.timeout_exec, + question=question, + query_category=query_category, + ) + row["exact_match"] = int(exact_match) + row["correct"] = int(correct) + row["error_msg"] = "" + if correct: + total_correct += 1 + except QueryCanceledError as e: + row["timeout"] = 1 + row["error_msg"] = f"QUERY EXECUTION TIMEOUT: {e}" + except Exception as e: + row["error_db_exec"] = 1 + row["error_msg"] = f"QUERY EXECUTION ERROR: {e}" + output_rows.append(row) + pbar.set_description( + f"Correct so far: {total_correct}/{total_tried} ({100*total_correct/total_tried:.2f}%)" + ) + output_df = pd.DataFrame(output_rows) + output_df = output_df.sort_values(by=["db_name", "query_category", "question"]) + # get directory of output_file and create if not exist + output_dir = os.path.dirname(output_file) + if not os.path.exists(output_dir): + os.makedirs(output_dir) + output_df.to_csv(output_file, index=False, float_format="%.2f") - # get average rate of exact matches - avg_acc = output_df["exact_match"].sum() / len(output_df) - print(f"Average rate of exact match: {avg_acc:.2f}") - # get average rate of correct results - avg_subset = output_df["correct"].sum() / len(output_df) - print(f"Average correct rate: {avg_subset:.2f}") + # get average rate of exact matches + avg_acc = output_df["exact_match"].sum() / len(output_df) + print(f"Average rate of exact match: {avg_acc:.2f}") + # get average rate of correct results + avg_subset = output_df["correct"].sum() / len(output_df) + print(f"Average correct rate: {avg_subset:.2f}") diff --git a/eval/hf_runner.py b/eval/hf_runner.py index 2bbf552..cec9fde 100644 --- a/eval/hf_runner.py +++ b/eval/hf_runner.py @@ -1,3 +1,4 @@ +import os from typing import Optional from eval.eval import compare_query_results import pandas as pd @@ -87,31 +88,18 @@ def get_tokenizer_model(model_name: Optional[str], adapter_path: Optional[str]): def run_hf_eval(args): # get params from args questions_file = args.questions_file - prompt_file = args.prompt_file + prompt_file_list = args.prompt_file num_questions = args.num_questions public_data = not args.use_private_data model_name = args.model adapter_path = args.adapter - output_file = args.output_file + output_file_list = args.output_file if model_name is None and adapter_path is None: raise ValueError( "You must supply either a model name or an adapter path to run an evaluation." ) - print("preparing questions...") - # get questions - print(f"Using {num_questions} questions from {questions_file}") - df = prepare_questions_df(questions_file, num_questions) - - # create a prompt for each question - df["prompt"] = df[["question", "db_name"]].apply( - lambda row: generate_prompt( - prompt_file, row["question"], row["db_name"], public_data - ), - axis=1, - ) - print("questions prepared\nnow loading model...") # initialize tokenizer and model tokenizer, model = get_tokenizer_model(model_name, adapter_path) @@ -123,82 +111,99 @@ def run_hf_eval(args): # eos_token_id = tokenizer.convert_tokens_to_ids(["```"])[0] pipe = pipeline("text-generation", model=model, tokenizer=tokenizer) - total_tried = 0 - total_correct = 0 - output_rows = [] - - with tqdm(total=len(df)) as pbar: - for row in df.to_dict("records"): - total_tried += 1 - start_time = time() - num_beams = dynamic_num_beams(row["prompt"], tokenizer) - # we set return_full_text to False so that we don't get the prompt text in the generated text - # this simplifies our postprocessing to deal with just the truncation of the end of the query - generated_query = ( - pipe( - row["prompt"], - max_new_tokens=300, - do_sample=False, - num_beams=num_beams, - num_return_sequences=1, - return_full_text=False, - eos_token_id=tokenizer.eos_token_id, - pad_token_id=tokenizer.eos_token_id, - )[0]["generated_text"] - .split("```")[0] - .split(";")[0] - .strip() - + ";" - ) - gc.collect() - torch.cuda.empty_cache() - torch.cuda.synchronize() - end_time = time() - - row["generated_query"] = generated_query - row["latency_seconds"] = end_time - start_time - golden_query = row["query"] - db_name = row["db_name"] - question = row["question"] - query_category = row["query_category"] - exact_match = correct = 0 - db_creds = { - "host": "localhost", - "port": 5432, - "user": "postgres", - "password": "postgres", - "database": db_name, - } - - try: - exact_match, correct = compare_query_results( - query_gold=golden_query, - query_gen=generated_query, - db_name=db_name, - db_creds=db_creds, - question=question, - query_category=query_category, + for prompt_file, output_file in zip(prompt_file_list, output_file_list): + print("preparing questions...") + # get questions + print(f"Using {num_questions} questions from {questions_file}") + df = prepare_questions_df(questions_file, num_questions) + # create a prompt for each question + df["prompt"] = df[["question", "db_name"]].apply( + lambda row: generate_prompt( + prompt_file, row["question"], row["db_name"], public_data + ), + axis=1, + ) + + total_tried = 0 + total_correct = 0 + output_rows = [] + + with tqdm(total=len(df)) as pbar: + for row in df.to_dict("records"): + total_tried += 1 + start_time = time() + num_beams = dynamic_num_beams(row["prompt"], tokenizer) + # we set return_full_text to False so that we don't get the prompt text in the generated text + # this simplifies our postprocessing to deal with just the truncation of the end of the query + generated_query = ( + pipe( + row["prompt"], + max_new_tokens=300, + do_sample=False, + num_beams=num_beams, + num_return_sequences=1, + return_full_text=False, + eos_token_id=tokenizer.eos_token_id, + pad_token_id=tokenizer.eos_token_id, + )[0]["generated_text"] + .split("```")[0] + .split(";")[0] + .strip() + + ";" + ) + gc.collect() + torch.cuda.empty_cache() + torch.cuda.synchronize() + end_time = time() + + row["generated_query"] = generated_query + row["latency_seconds"] = end_time - start_time + golden_query = row["query"] + db_name = row["db_name"] + question = row["question"] + query_category = row["query_category"] + exact_match = correct = 0 + db_creds = { + "host": "localhost", + "port": 5432, + "user": "postgres", + "password": "postgres", + "database": db_name, + } + + try: + exact_match, correct = compare_query_results( + query_gold=golden_query, + query_gen=generated_query, + db_name=db_name, + db_creds=db_creds, + question=question, + query_category=query_category, + ) + row["exact_match"] = int(exact_match) + row["correct"] = int(correct) + row["error_msg"] = "" + if correct: + total_correct += 1 + except QueryCanceledError as e: + row["timeout"] = 1 + row["error_msg"] = f"QUERY EXECUTION TIMEOUT: {e}" + except Exception as e: + row["error_db_exec"] = 1 + row["error_msg"] = f"QUERY EXECUTION ERROR: {e}" + + output_rows.append(row) + pbar.update(1) + pbar.set_description( + f"Correct so far: {total_correct}/{total_tried} ({100*total_correct/total_tried:.2f}%)" ) - row["exact_match"] = int(exact_match) - row["correct"] = int(correct) - row["error_msg"] = "" - if correct: - total_correct += 1 - except QueryCanceledError as e: - row["timeout"] = 1 - row["error_msg"] = f"QUERY EXECUTION TIMEOUT: {e}" - except Exception as e: - row["error_db_exec"] = 1 - row["error_msg"] = f"QUERY EXECUTION ERROR: {e}" - - output_rows.append(row) - pbar.update(1) - pbar.set_description( - f"Correct so far: {total_correct}/{total_tried} ({100*total_correct/total_tried:.2f}%)" - ) - - output_df = pd.DataFrame(output_rows) - del output_df["prompt"] - print(output_df.groupby("query_category")[["exact_match", "correct"]].mean()) - output_df = output_df.sort_values(by=["db_name", "query_category", "question"]) - output_df.to_csv(output_file, index=False, float_format="%.2f") + + output_df = pd.DataFrame(output_rows) + del output_df["prompt"] + print(output_df.groupby("query_category")[["exact_match", "correct"]].mean()) + output_df = output_df.sort_values(by=["db_name", "query_category", "question"]) + # get directory of output_file and create if not exist + output_dir = os.path.dirname(output_file) + if not os.path.exists(output_dir): + os.makedirs(output_dir) + output_df.to_csv(output_file, index=False, float_format="%.2f") diff --git a/eval/openai_runner.py b/eval/openai_runner.py index 47aac1b..2066bce 100644 --- a/eval/openai_runner.py +++ b/eval/openai_runner.py @@ -1,5 +1,6 @@ from concurrent.futures import ThreadPoolExecutor, as_completed import copy +import os from eval.eval import compare_query_results import pandas as pd from psycopg2.extensions import QueryCanceledError @@ -9,71 +10,21 @@ def run_openai_eval(args): - print("preparing questions...") - # get questions - question_query_df = prepare_questions_df(args.questions_file, args.num_questions) + for prompt_file, output_file in zip(args.prompt_file, args.output_file): + print("preparing questions...") + # get questions + question_query_df = prepare_questions_df( + args.questions_file, args.num_questions + ) - input_rows = question_query_df.to_dict("records") - output_rows = [] - with ThreadPoolExecutor(args.parallel_threads) as executor: - # for each query in the csv, generate a query using the generator asynchronously - futures = [] - for row in input_rows: - # get db creds for each row's db_name - db_name = row["db_name"] - db_creds = { - "host": "localhost", - "port": 5432, - "user": "postgres", - "password": "postgres", - "database": db_name, - } - - qg = OpenAIQueryGenerator( - db_creds=copy.deepcopy(db_creds), - model=args.model, - prompt_file=args.prompt_file, - timeout=args.timeout_gen, - use_public_data=not args.use_private_data, - verbose=args.verbose, - ) - - generated_query_fut = executor.submit( - qg.generate_query, question=row["question"] - ) - futures.append(generated_query_fut) - - total_tried = 0 - total_correct = 0 - for f in (pbar := tqdm(as_completed(futures), total=len(futures))): - total_tried += 1 - i = futures.index(f) - row = input_rows[i] - result_dict = f.result() - query_gen = result_dict["query"] - reason = result_dict["reason"] - err = result_dict["err"] - # save custom metrics - if "latency_seconds" in result_dict: - row["latency_seconds"] = result_dict["latency_seconds"] - if "tokens_used" in result_dict: - row["tokens_used"] = result_dict["tokens_used"] - row["generated_query"] = query_gen - row["reason"] = reason - row["error_msg"] = err - # save failures into relevant columns in the dataframe - if "GENERATION ERROR" in err: - row["error_query_gen"] = 1 - elif "EXECUTION ERROR" in err: - row["error_db_exec"] = 1 - elif "TIMEOUT" in err: - row["timeout"] = 1 - else: - expected_query = row["query"] + input_rows = question_query_df.to_dict("records") + output_rows = [] + with ThreadPoolExecutor(args.parallel_threads) as executor: + # for each query in the csv, generate a query using the generator asynchronously + futures = [] + for row in input_rows: + # get db creds for each row's db_name db_name = row["db_name"] - question = row["question"] - query_category = row["query_category"] - exact_match = correct = 0 db_creds = { "host": "localhost", "port": 5432, @@ -81,39 +32,96 @@ def run_openai_eval(args): "password": "postgres", "database": db_name, } - # try executing the queries and compare the results if they succeed - try: - exact_match, correct = compare_query_results( - query_gold=expected_query, - query_gen=query_gen, - db_name=db_name, - db_creds=db_creds, - timeout=args.timeout_exec, - question=question, - query_category=query_category, - ) - row["exact_match"] = int(exact_match) - row["correct"] = int(correct) - row["error_msg"] = "" - if correct: - total_correct += 1 - except QueryCanceledError as e: - row["timeout"] = 1 - row["error_msg"] = f"QUERY EXECUTION TIMEOUT: {e}" - except Exception as e: + + qg = OpenAIQueryGenerator( + db_creds=copy.deepcopy(db_creds), + model=args.model, + prompt_file=prompt_file, + timeout=args.timeout_gen, + use_public_data=not args.use_private_data, + verbose=args.verbose, + ) + + generated_query_fut = executor.submit( + qg.generate_query, question=row["question"] + ) + futures.append(generated_query_fut) + + total_tried = 0 + total_correct = 0 + for f in (pbar := tqdm(as_completed(futures), total=len(futures))): + total_tried += 1 + i = futures.index(f) + row = input_rows[i] + result_dict = f.result() + query_gen = result_dict["query"] + reason = result_dict["reason"] + err = result_dict["err"] + # save custom metrics + if "latency_seconds" in result_dict: + row["latency_seconds"] = result_dict["latency_seconds"] + if "tokens_used" in result_dict: + row["tokens_used"] = result_dict["tokens_used"] + row["generated_query"] = query_gen + row["reason"] = reason + row["error_msg"] = err + # save failures into relevant columns in the dataframe + if "GENERATION ERROR" in err: + row["error_query_gen"] = 1 + elif "EXECUTION ERROR" in err: row["error_db_exec"] = 1 - row["error_msg"] = f"QUERY EXECUTION ERROR: {e}" - output_rows.append(row) - pbar.set_description( - f"Correct so far: {total_correct}/{total_tried} ({100*total_correct/total_tried:.2f}%)" - ) - output_df = pd.DataFrame(output_rows) - output_df = output_df.sort_values(by=["db_name", "query_category", "question"]) - if "prompt" in output_df.columns: - del output_df["prompt"] - print(output_df.groupby("query_category")[["exact_match", "correct"]].mean()) - output_df.to_csv(args.output_file, index=False, float_format="%.2f") + elif "TIMEOUT" in err: + row["timeout"] = 1 + else: + expected_query = row["query"] + db_name = row["db_name"] + question = row["question"] + query_category = row["query_category"] + exact_match = correct = 0 + db_creds = { + "host": "localhost", + "port": 5432, + "user": "postgres", + "password": "postgres", + "database": db_name, + } + # try executing the queries and compare the results if they succeed + try: + exact_match, correct = compare_query_results( + query_gold=expected_query, + query_gen=query_gen, + db_name=db_name, + db_creds=db_creds, + timeout=args.timeout_exec, + question=question, + query_category=query_category, + ) + row["exact_match"] = int(exact_match) + row["correct"] = int(correct) + row["error_msg"] = "" + if correct: + total_correct += 1 + except QueryCanceledError as e: + row["timeout"] = 1 + row["error_msg"] = f"QUERY EXECUTION TIMEOUT: {e}" + except Exception as e: + row["error_db_exec"] = 1 + row["error_msg"] = f"QUERY EXECUTION ERROR: {e}" + output_rows.append(row) + pbar.set_description( + f"Correct so far: {total_correct}/{total_tried} ({100*total_correct/total_tried:.2f}%)" + ) + output_df = pd.DataFrame(output_rows) + output_df = output_df.sort_values(by=["db_name", "query_category", "question"]) + if "prompt" in output_df.columns: + del output_df["prompt"] + print(output_df.groupby("query_category")[["exact_match", "correct"]].mean()) + # get directory of output_file and create if not exist + output_dir = os.path.dirname(output_file) + if not os.path.exists(output_dir): + os.makedirs(output_dir) + output_df.to_csv(output_file, index=False, float_format="%.2f") - # get average rate of correct results - avg_subset = output_df["correct"].sum() / len(output_df) - print(f"Average correct rate: {avg_subset:.2f}") + # get average rate of correct results + avg_subset = output_df["correct"].sum() / len(output_df) + print(f"Average correct rate: {avg_subset:.2f}") diff --git a/eval/vllm_runner.py b/eval/vllm_runner.py index c4b8dcf..65e4be9 100644 --- a/eval/vllm_runner.py +++ b/eval/vllm_runner.py @@ -1,3 +1,4 @@ +import os import sqlparse from vllm import LLM, SamplingParams from eval.eval import compare_query_results @@ -24,27 +25,14 @@ def generate_prompt(prompt_file, question, db_name, public_data): def run_vllm_eval(args): # get params from args questions_file = args.questions_file - prompt_file = args.prompt_file + prompt_file_list = args.prompt_file num_questions = args.num_questions public_data = not args.use_private_data model_name = args.model - output_file = args.output_file + output_file_list = args.output_file num_beams = args.num_beams - # get questions - print( - f"Preparing {'all' if num_questions is None else num_questions} questions from {questions_file}" - ) - df = prepare_questions_df(questions_file, num_questions) - - # create a prompt for each question - df["prompt"] = df[["question", "db_name"]].apply( - lambda row: generate_prompt( - prompt_file, row["question"], row["db_name"], public_data - ), - axis=1, - ) - + # initialize model only once as it takes a while print(f"Preparing {model_name}") tokenizer = AutoTokenizer.from_pretrained(model_name) llm = LLM(model=model_name, tensor_parallel_size=torch.cuda.device_count()) @@ -58,61 +46,84 @@ def run_vllm_eval(args): temperature=0, ) - print(f"Generating {len(df)} completions") - start_time = time.time() - outputs = llm.generate(df["prompt"].tolist(), sampling_params) - time_taken = time.time() - start_time - print(f"Time taken: {time_taken:.1f}s") + for prompt_file, output_file in zip(prompt_file_list, output_file_list): + print(f"Using prompt file {prompt_file}") + # get questions and create a prompt for each question + df = prepare_questions_df(questions_file, num_questions) + df["prompt"] = df[["question", "db_name"]].apply( + lambda row: generate_prompt( + prompt_file, row["question"], row["db_name"], public_data + ), + axis=1, + ) + print(f"Prepared {len(df)} questions from {questions_file}") + print(f"Generating completions") + start_time = time.time() + # we pass the full list of prompts at once to the vllm engine + outputs = llm.generate(df["prompt"].tolist(), sampling_params) + time_taken = time.time() - start_time + print(f"Time taken: {time_taken:.1f}s") + + # save generation metrics + df["latency_seconds"] = time_taken / len(df) - df["generated_query"] = "" - df["correct"] = 0 - df["exact_match"] = 0 - df["error_db_exec"] = 0 - df["error_msg"] = "" - total_correct = 0 - with tqdm(total=len(df)) as pbar: - for i, output in enumerate(outputs): - generated_query = output.outputs[0].text.split(";")[0].strip() - normalized_query = sqlparse.format( - generated_query, keyword_case="upper", strip_whitespace=True - ) - df.loc[i, "generated_query"] = normalized_query - df.loc[i, "latency_seconds"] = time_taken / len(df) - row = df.iloc[i] - golden_query = row["query"] - db_name = row["db_name"] - question = row["question"] - query_category = row["query_category"] - exact_match = correct = 0 - db_creds = { - "host": "localhost", - "port": 5432, - "user": "postgres", - "password": "postgres", - "database": db_name, - } - try: - exact_match, correct = compare_query_results( - query_gold=golden_query, - query_gen=generated_query, - db_name=db_name, - db_creds=db_creds, - question=question, - query_category=query_category, + df["generated_query"] = "" + df["tokens_used"] = 0 + df["correct"] = 0 + df["exact_match"] = 0 + df["error_db_exec"] = 0 + df["error_msg"] = "" + total_correct = 0 + with tqdm(total=len(df)) as pbar: + for i, output in enumerate(outputs): + generated_query = output.outputs[0].text.split(";")[0].strip() + normalized_query = sqlparse.format( + generated_query, keyword_case="upper", strip_whitespace=True + ) + df.loc[i, "generated_query"] = normalized_query + df.loc[i, "tokens_used"] = len(output.outputs[0].token_ids) + df.loc[i, "latency_seconds"] = time_taken / len(df) + row = df.iloc[i] + golden_query = row["query"] + db_name = row["db_name"] + question = row["question"] + query_category = row["query_category"] + exact_match = correct = 0 + db_creds = { + "host": "localhost", + "port": 5432, + "user": "postgres", + "password": "postgres", + "database": db_name, + } + try: + exact_match, correct = compare_query_results( + query_gold=golden_query, + query_gen=generated_query, + db_name=db_name, + db_creds=db_creds, + question=question, + query_category=query_category, + ) + df.loc[i, "exact_match"] = int(exact_match) + df.loc[i, "correct"] = int(correct) + df.loc[i, "error_msg"] = "" + if correct: + total_correct += 1 + except Exception as e: + df.loc[i, "error_db_exec"] = 1 + df.loc[i, "error_msg"] = f"QUERY EXECUTION ERROR: {e}" + pbar.update(1) + pbar.set_description( + f"Correct so far: {total_correct}/{(i+1)} ({100*total_correct/(i+1):.2f}%)" ) - df.loc[i, "exact_match"] = int(exact_match) - df.loc[i, "correct"] = int(correct) - df.loc[i, "error_msg"] = "" - if correct: - total_correct += 1 - except Exception as e: - df.loc[i, "error_db_exec"] = 1 - df.loc[i, "error_msg"] = f"QUERY EXECUTION ERROR: {e}" - pbar.update(1) - pbar.set_description( - f"Correct so far: {total_correct}/{(i+1)} ({100*total_correct/(i+1):.2f}%)" - ) - del df["prompt"] - print(df.groupby("query_category")[["exact_match", "correct"]].mean()) - df = df.sort_values(by=["db_name", "query_category", "question"]) - df.to_csv(output_file, index=False, float_format="%.2f") + del df["prompt"] + print(df.groupby("query_category")[["exact_match", "correct"]].mean()) + df = df.sort_values(by=["db_name", "query_category", "question"]) + print(f"Average tokens generated: {df['tokens_used'].mean():.1f}") + # get directory of output_file and create if not exist + output_dir = os.path.dirname(output_file) + if not os.path.exists(output_dir): + os.makedirs(output_dir) + df.to_csv(output_file, index=False, float_format="%.2f") + print(f"Saved results to {output_file}") diff --git a/main.py b/main.py index 568ed0d..20c627c 100644 --- a/main.py +++ b/main.py @@ -8,9 +8,10 @@ parser.add_argument("-m", "--model", type=str) parser.add_argument("-a", "--adapter", type=str) parser.add_argument("-b", "--num_beams", type=int, default=4) - parser.add_argument("-f", "--prompt_file", type=str, required=True) + # take in a list of prompt files + parser.add_argument("-f", "--prompt_file", nargs="+", type=str, required=True) parser.add_argument("-d", "--use_private_data", action="store_true") - parser.add_argument("-o", "--output_file", type=str, required=True) + parser.add_argument("-o", "--output_file", nargs="+", type=str, required=True) parser.add_argument("-p", "--parallel_threads", type=int, default=5) parser.add_argument("-t", "--timeout_gen", type=float, default=30.0) parser.add_argument("-u", "--timeout_exec", type=float, default=10.0) @@ -18,6 +19,12 @@ args = parser.parse_args() + # check that the list of prompt files has the same length as the list of output files + if len(args.prompt_file) != len(args.output_file): + raise ValueError( + f"Number of prompt files ({len(args.prompt_file)}) must be the same as the number of output files ({len(args.output_file)})" + ) + if args.model_type == "oa": from eval.openai_runner import run_openai_eval