From 234e3b0b09fdfd2902daa2290103189df42f724c Mon Sep 17 00:00:00 2001 From: CodingWithTim Date: Fri, 8 Nov 2024 06:34:34 +0000 Subject: [PATCH 01/10] optimize clean_data script --- fastchat/serve/monitor/clean_chat_data.py | 199 +++++++++++++--------- 1 file changed, 123 insertions(+), 76 deletions(-) diff --git a/fastchat/serve/monitor/clean_chat_data.py b/fastchat/serve/monitor/clean_chat_data.py index 2bda0e2c3..26899d829 100644 --- a/fastchat/serve/monitor/clean_chat_data.py +++ b/fastchat/serve/monitor/clean_chat_data.py @@ -5,11 +5,13 @@ python3 clean_chat_data.py """ import argparse -import datetime import json import os from pytz import timezone +from functools import partial +from datetime import datetime, timedelta import time +import multiprocessing as mp from tqdm import tqdm @@ -24,14 +26,24 @@ NETWORK_ERROR_MSG = ( "NETWORK ERROR DUE TO HIGH TRAFFIC. PLEASE REGENERATE OR REFRESH THIS PAGE.".lower() ) +MANAGER = mp.Manager() +LOCK = MANAGER.Lock() -def get_log_files(max_num_files=None): - dates = [] - for month in range(4, 12): - for day in range(1, 33): - dates.append(f"2023-{month:02d}-{day:02d}") +def date_range(start="2023-04-01"): + start_date = datetime.strptime(start, "%Y-%m-%d").date() + end_date = datetime.now().date() + delta = end_date - start_date + dates = [ + (start_date + timedelta(days=d)).strftime("%Y-%m-%d") + for d in range(delta.days + 2) + ] + + return dates + +def get_log_files(max_num_files=None): + dates = date_range() filenames = [] for d in dates: for i in range(NUM_SERVERS): @@ -44,90 +56,125 @@ def get_log_files(max_num_files=None): return filenames -def clean_chat_data(log_files, action_type): - raw_data = [] - for filename in tqdm(log_files, desc="read files"): - for retry in range(5): - try: - lines = open(filename).readlines() - break - except FileNotFoundError: - time.sleep(2) - - for l in lines: - row = json.loads(l) - if row["type"] == action_type: - raw_data.append(row) +def get_action_type_data(filename, action_type): + for _ in range(5): + try: + lines = open(filename).readlines() + break + except FileNotFoundError: + time.sleep(2) - all_models = set() - all_ips = dict() - chats = [] + for l in lines: + row = json.loads(l) + if row["type"] == action_type: + return row + + +def process_data(row, action_type, all_ips): + # Initialize local counters ct_invalid_conv_id = 0 ct_invalid = 0 ct_network_error = 0 - for row in raw_data: - try: - if action_type in ["chat", "upvote", "downvote"]: - state = row["state"] - model = row["model"] - elif action_type == "leftvote": - state = row["states"][0] - model = row["states"][0]["model_name"] - elif action_type == "rightvote": - state = row["states"][1] - model = row["states"][1]["model_name"] - conversation_id = state["conv_id"] - except KeyError: - ct_invalid_conv_id += 1 - continue - if conversation_id is None: - ct_invalid_conv_id += 1 - continue + try: + if action_type in ["chat", "upvote", "downvote"]: + state = row["state"] + model = row["model"] + elif action_type == "leftvote": + state = row["states"][0] + model = row["states"][0]["model_name"] + elif action_type == "rightvote": + state = row["states"][1] + model = row["states"][1]["model_name"] + conversation_id = state["conv_id"] + except KeyError: + ct_invalid_conv_id += 1 + return None, ct_invalid_conv_id, ct_invalid, ct_network_error, None + + if conversation_id is None: + ct_invalid_conv_id += 1 + return None, ct_invalid_conv_id, ct_invalid, ct_network_error, None + + conversation = to_openai_format(state["messages"][state["offset"] :]) + if not isinstance(model, str): + ct_invalid += 1 + return None, ct_invalid_conv_id, ct_invalid, ct_network_error, None + model = replace_model_name(model, row["tstamp"]) + + try: + lang_code = detect_language(state["messages"][state["offset"]][1]) + except IndexError: + ct_invalid += 1 + return None, ct_invalid_conv_id, ct_invalid, ct_network_error, None + + if not all(isinstance(x["content"], str) for x in conversation): + ct_invalid += 1 + return None, ct_invalid_conv_id, ct_invalid, ct_network_error, None + + messages = "".join([x["content"] for x in conversation]).lower() + if NETWORK_ERROR_MSG in messages: + ct_network_error += 1 + return None, ct_invalid_conv_id, ct_invalid, ct_network_error, None + + ip = row["ip"] + # Synchronize access to all_ips using the lock + with LOCK: + if ip not in all_ips: + all_ips[ip] = len(all_ips) + user_id = all_ips[ip] - conversation = to_openai_format(state["messages"][state["offset"] :]) - if not isinstance(model, str): - ct_invalid += 1 - continue - model = replace_model_name(model, row["tstamp"]) + # Prepare the result data + result = dict( + conversation_id=conversation_id, + model=model, + conversation=conversation, + turn=len(conversation) // 2, + language=lang_code, + user_id=user_id, + tstamp=row["tstamp"], + ) - try: - lang_code = detect_language(state["messages"][state["offset"]][1]) - except IndexError: - ct_invalid += 1 - continue + return result, ct_invalid_conv_id, ct_invalid, ct_network_error, model - if not all(isinstance(x["content"], str) for x in conversation): - ct_invalid += 1 - continue - messages = "".join([x["content"] for x in conversation]).lower() - if NETWORK_ERROR_MSG in messages: - ct_network_error += 1 - continue +def clean_chat_data(log_files, action_type): + with mp.Pool() as pool: + # Use partial to pass action_type to get_action_type_data + func = partial(get_action_type_data, action_type=action_type) + raw_data = pool.map(func, log_files, chunksize=1) - ip = row["ip"] - if ip not in all_ips: - all_ips[ip] = len(all_ips) - user_id = all_ips[ip] + # filter out Nones as some files may not contain any data belong to action_type + raw_data = [r for r in raw_data if r is not None] + all_ips = MANAGER.dict() + + # Use the multiprocessing Pool + with mp.Pool() as pool: + func = partial(process_data, action_type=action_type, all_ips=all_ips) + results = pool.map(func, raw_data, chunksize=1) - chats.append( - dict( - conversation_id=conversation_id, - model=model, - conversation=conversation, - turn=len(conversation) // 2, - language=lang_code, - user_id=user_id, - tstamp=row["tstamp"], - ) - ) + # Initialize counters and collections in the parent process + ct_invalid_conv_id = 0 + ct_invalid = 0 + ct_network_error = 0 + all_models = set() + chats = [] - all_models.update([model]) + # Aggregate results from child processes + for res in results: + if res is None: + continue + data, inv_conv_id, inv, net_err, model = res + ct_invalid_conv_id += inv_conv_id + ct_invalid += inv + ct_network_error += net_err + if data: + chats.append(data) + if model: + all_models.add(model) chats.sort(key=lambda x: x["tstamp"]) last_updated_tstamp = chats[-1]["tstamp"] - last_updated_datetime = datetime.datetime.fromtimestamp( + last_updated_datetime = datetime.fromtimestamp( last_updated_tstamp, tz=timezone("US/Pacific") ).strftime("%Y-%m-%d %H:%M:%S %Z") @@ -161,7 +208,7 @@ def clean_chat_data(log_files, action_type): log_files = get_log_files(args.max_num_files) chats = clean_chat_data(log_files, args.action_type) last_updated_tstamp = chats[-1]["tstamp"] - cutoff_date = datetime.datetime.fromtimestamp( + cutoff_date = datetime.fromtimestamp( last_updated_tstamp, tz=timezone("US/Pacific") ).strftime("%Y%m%d") From f21c56fdc44545b67eba9f402337b52046e10905 Mon Sep 17 00:00:00 2001 From: CodingWithTim Date: Sun, 10 Nov 2024 00:23:24 +0000 Subject: [PATCH 02/10] improve --- fastchat/serve/monitor/clean_chat_data.py | 116 ++++++++++++---------- 1 file changed, 65 insertions(+), 51 deletions(-) diff --git a/fastchat/serve/monitor/clean_chat_data.py b/fastchat/serve/monitor/clean_chat_data.py index 26899d829..c8921ac0c 100644 --- a/fastchat/serve/monitor/clean_chat_data.py +++ b/fastchat/serve/monitor/clean_chat_data.py @@ -7,14 +7,13 @@ import argparse import json import os +import hashlib from pytz import timezone from functools import partial from datetime import datetime, timedelta import time import multiprocessing as mp -from tqdm import tqdm - from fastchat.serve.monitor.basic_stats import NUM_SERVERS from fastchat.serve.monitor.clean_battle_data import ( to_openai_format, @@ -64,18 +63,15 @@ def get_action_type_data(filename, action_type): except FileNotFoundError: time.sleep(2) + rows = [] for l in lines: row = json.loads(l) if row["type"] == action_type: - return row - + rows.append(row) + return rows -def process_data(row, action_type, all_ips): - # Initialize local counters - ct_invalid_conv_id = 0 - ct_invalid = 0 - ct_network_error = 0 +def process_data(row, action_type): try: if action_type in ["chat", "upvote", "downvote"]: state = row["state"] @@ -88,40 +84,64 @@ def process_data(row, action_type, all_ips): model = row["states"][1]["model_name"] conversation_id = state["conv_id"] except KeyError: - ct_invalid_conv_id += 1 - return None, ct_invalid_conv_id, ct_invalid, ct_network_error, None + return { + "result": None, + "ct_invalid_conv_id": 1, + "ct_invalid": 0, + "ct_network_error": 0, + "model": None, + } if conversation_id is None: - ct_invalid_conv_id += 1 - return None, ct_invalid_conv_id, ct_invalid, ct_network_error, None + return { + "result": None, + "ct_invalid_conv_id": 1, + "ct_invalid": 0, + "ct_network_error": 0, + "model": None, + } conversation = to_openai_format(state["messages"][state["offset"] :]) if not isinstance(model, str): - ct_invalid += 1 - return None, ct_invalid_conv_id, ct_invalid, ct_network_error, None + return { + "result": None, + "ct_invalid_conv_id": 0, + "ct_invalid": 1, + "ct_network_error": 0, + "model": None, + } model = replace_model_name(model, row["tstamp"]) try: lang_code = detect_language(state["messages"][state["offset"]][1]) except IndexError: - ct_invalid += 1 - return None, ct_invalid_conv_id, ct_invalid, ct_network_error, None + return { + "result": None, + "ct_invalid_conv_id": 0, + "ct_invalid": 1, + "ct_network_error": 0, + "model": None, + } if not all(isinstance(x["content"], str) for x in conversation): - ct_invalid += 1 - return None, ct_invalid_conv_id, ct_invalid, ct_network_error, None + return { + "result": None, + "ct_invalid_conv_id": 0, + "ct_invalid": 1, + "ct_network_error": 0, + "model": None, + } messages = "".join([x["content"] for x in conversation]).lower() if NETWORK_ERROR_MSG in messages: - ct_network_error += 1 - return None, ct_invalid_conv_id, ct_invalid, ct_network_error, None - - ip = row["ip"] - # Synchronize access to all_ips using the lock - with LOCK: - if ip not in all_ips: - all_ips[ip] = len(all_ips) - user_id = all_ips[ip] + return { + "result": None, + "ct_invalid_conv_id": 0, + "ct_invalid": 0, + "ct_network_error": 1, + "model": None, + } + user_id = hashlib.md5(row["ip"].encode()).hexdigest() # Prepare the result data result = dict( @@ -134,43 +154,37 @@ def process_data(row, action_type, all_ips): tstamp=row["tstamp"], ) - return result, ct_invalid_conv_id, ct_invalid, ct_network_error, model + return { + "result": result, + "ct_invalid_conv_id": 0, + "ct_invalid": 0, + "ct_network_error": 0, + "model": model, + } def clean_chat_data(log_files, action_type): with mp.Pool() as pool: # Use partial to pass action_type to get_action_type_data func = partial(get_action_type_data, action_type=action_type) - raw_data = pool.map(func, log_files, chunksize=1) - + file_data = pool.map(func, log_files, chunksize=1) # filter out Nones as some files may not contain any data belong to action_type + raw_data = [] + for data in file_data: + raw_data.extend(data) raw_data = [r for r in raw_data if r is not None] - all_ips = MANAGER.dict() # Use the multiprocessing Pool with mp.Pool() as pool: - func = partial(process_data, action_type=action_type, all_ips=all_ips) + func = partial(process_data, action_type=action_type) results = pool.map(func, raw_data, chunksize=1) - # Initialize counters and collections in the parent process - ct_invalid_conv_id = 0 - ct_invalid = 0 - ct_network_error = 0 - all_models = set() - chats = [] - # Aggregate results from child processes - for res in results: - if res is None: - continue - data, inv_conv_id, inv, net_err, model = res - ct_invalid_conv_id += inv_conv_id - ct_invalid += inv - ct_network_error += net_err - if data: - chats.append(data) - if model: - all_models.add(model) + ct_invalid_conv_id = sum([data["ct_invalid_conv_id"] for data in results]) + ct_invalid = sum([data["ct_invalid"] for data in results]) + ct_network_error = sum([data["ct_network_error"] for data in results]) + all_models = set([data["model"] for data in results if not (data["model"] is None)]) + chats = [data["result"] for data in results if not (data["result"] is None)] chats.sort(key=lambda x: x["tstamp"]) last_updated_tstamp = chats[-1]["tstamp"] From bbf575236c3812739c346fac825c1df11db55593 Mon Sep 17 00:00:00 2001 From: CodingWithTim Date: Sun, 10 Nov 2024 00:28:45 +0000 Subject: [PATCH 03/10] improve 2 --- fastchat/serve/monitor/clean_chat_data.py | 41 +++++------------------ 1 file changed, 9 insertions(+), 32 deletions(-) diff --git a/fastchat/serve/monitor/clean_chat_data.py b/fastchat/serve/monitor/clean_chat_data.py index c8921ac0c..08ae68cf1 100644 --- a/fastchat/serve/monitor/clean_chat_data.py +++ b/fastchat/serve/monitor/clean_chat_data.py @@ -85,30 +85,18 @@ def process_data(row, action_type): conversation_id = state["conv_id"] except KeyError: return { - "result": None, "ct_invalid_conv_id": 1, - "ct_invalid": 0, - "ct_network_error": 0, - "model": None, } if conversation_id is None: return { - "result": None, "ct_invalid_conv_id": 1, - "ct_invalid": 0, - "ct_network_error": 0, - "model": None, } conversation = to_openai_format(state["messages"][state["offset"] :]) if not isinstance(model, str): return { - "result": None, - "ct_invalid_conv_id": 0, "ct_invalid": 1, - "ct_network_error": 0, - "model": None, } model = replace_model_name(model, row["tstamp"]) @@ -116,30 +104,18 @@ def process_data(row, action_type): lang_code = detect_language(state["messages"][state["offset"]][1]) except IndexError: return { - "result": None, - "ct_invalid_conv_id": 0, "ct_invalid": 1, - "ct_network_error": 0, - "model": None, } if not all(isinstance(x["content"], str) for x in conversation): return { - "result": None, - "ct_invalid_conv_id": 0, "ct_invalid": 1, - "ct_network_error": 0, - "model": None, } messages = "".join([x["content"] for x in conversation]).lower() if NETWORK_ERROR_MSG in messages: return { - "result": None, - "ct_invalid_conv_id": 0, - "ct_invalid": 0, "ct_network_error": 1, - "model": None, } user_id = hashlib.md5(row["ip"].encode()).hexdigest() @@ -156,9 +132,6 @@ def process_data(row, action_type): return { "result": result, - "ct_invalid_conv_id": 0, - "ct_invalid": 0, - "ct_network_error": 0, "model": model, } @@ -180,11 +153,15 @@ def clean_chat_data(log_files, action_type): results = pool.map(func, raw_data, chunksize=1) # Aggregate results from child processes - ct_invalid_conv_id = sum([data["ct_invalid_conv_id"] for data in results]) - ct_invalid = sum([data["ct_invalid"] for data in results]) - ct_network_error = sum([data["ct_network_error"] for data in results]) - all_models = set([data["model"] for data in results if not (data["model"] is None)]) - chats = [data["result"] for data in results if not (data["result"] is None)] + ct_invalid_conv_id = sum( + [data["ct_invalid_conv_id"] for data in results if "ct_invalid_conv_id" in data] + ) + ct_invalid = sum([data["ct_invalid"] for data in results if "ct_invalid" in data]) + ct_network_error = sum( + [data["ct_network_error"] for data in results if "ct_network_error" in data] + ) + all_models = set([data["model"] for data in results if "model" in data]) + chats = [data["result"] for data in results if "result" in data] chats.sort(key=lambda x: x["tstamp"]) last_updated_tstamp = chats[-1]["tstamp"] From 78941e86766a32cde9d71a28074bab0e06904fe3 Mon Sep 17 00:00:00 2001 From: CodingWithTim Date: Sun, 10 Nov 2024 00:31:15 +0000 Subject: [PATCH 04/10] improve 3 --- fastchat/serve/monitor/clean_chat_data.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/fastchat/serve/monitor/clean_chat_data.py b/fastchat/serve/monitor/clean_chat_data.py index 08ae68cf1..6149cebe7 100644 --- a/fastchat/serve/monitor/clean_chat_data.py +++ b/fastchat/serve/monitor/clean_chat_data.py @@ -25,8 +25,6 @@ NETWORK_ERROR_MSG = ( "NETWORK ERROR DUE TO HIGH TRAFFIC. PLEASE REGENERATE OR REFRESH THIS PAGE.".lower() ) -MANAGER = mp.Manager() -LOCK = MANAGER.Lock() def date_range(start="2023-04-01"): From 7ced22dd25daddab395777efa7ed2c66cb767a6f Mon Sep 17 00:00:00 2001 From: CodingWithTim Date: Sun, 10 Nov 2024 01:37:22 +0000 Subject: [PATCH 05/10] fix chunck size --- fastchat/serve/monitor/clean_chat_data.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/fastchat/serve/monitor/clean_chat_data.py b/fastchat/serve/monitor/clean_chat_data.py index 6149cebe7..47ce49685 100644 --- a/fastchat/serve/monitor/clean_chat_data.py +++ b/fastchat/serve/monitor/clean_chat_data.py @@ -10,6 +10,7 @@ import hashlib from pytz import timezone from functools import partial +from math import ceil from datetime import datetime, timedelta import time import multiprocessing as mp @@ -134,11 +135,13 @@ def process_data(row, action_type): } -def clean_chat_data(log_files, action_type): - with mp.Pool() as pool: +def clean_chat_data(log_files, action_type, num_parallel): + with mp.Pool(num_parallel) as pool: # Use partial to pass action_type to get_action_type_data func = partial(get_action_type_data, action_type=action_type) - file_data = pool.map(func, log_files, chunksize=1) + file_data = pool.map( + func, log_files, chunksize=ceil(len(log_files) / len(pool._pool)) + ) # filter out Nones as some files may not contain any data belong to action_type raw_data = [] for data in file_data: @@ -146,9 +149,11 @@ def clean_chat_data(log_files, action_type): raw_data = [r for r in raw_data if r is not None] # Use the multiprocessing Pool - with mp.Pool() as pool: + with mp.Pool(num_parallel) as pool: func = partial(process_data, action_type=action_type) - results = pool.map(func, raw_data, chunksize=1) + results = pool.map( + func, raw_data, chunksize=ceil(len(log_files) / len(pool._pool)) + ) # Aggregate results from child processes ct_invalid_conv_id = sum( @@ -192,10 +197,11 @@ def clean_chat_data(log_files, action_type): parser = argparse.ArgumentParser() parser.add_argument("--action-type", type=str, default="chat") parser.add_argument("--max-num-files", type=int) + parser.add_argument("--num-parallel", type=int, default=1) args = parser.parse_args() log_files = get_log_files(args.max_num_files) - chats = clean_chat_data(log_files, args.action_type) + chats = clean_chat_data(log_files, args.action_type, args.num_parallel) last_updated_tstamp = chats[-1]["tstamp"] cutoff_date = datetime.fromtimestamp( last_updated_tstamp, tz=timezone("US/Pacific") From b8f60281fdb8dafde62ec9135b631ccccfa95329 Mon Sep 17 00:00:00 2001 From: CodingWithTim Date: Mon, 11 Nov 2024 00:24:44 +0000 Subject: [PATCH 06/10] fix chunck size --- fastchat/serve/monitor/clean_chat_data.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fastchat/serve/monitor/clean_chat_data.py b/fastchat/serve/monitor/clean_chat_data.py index 47ce49685..edad2ac08 100644 --- a/fastchat/serve/monitor/clean_chat_data.py +++ b/fastchat/serve/monitor/clean_chat_data.py @@ -146,13 +146,13 @@ def clean_chat_data(log_files, action_type, num_parallel): raw_data = [] for data in file_data: raw_data.extend(data) - raw_data = [r for r in raw_data if r is not None] + raw_data = [r for r in raw_data if not (r is None)] # Use the multiprocessing Pool with mp.Pool(num_parallel) as pool: func = partial(process_data, action_type=action_type) results = pool.map( - func, raw_data, chunksize=ceil(len(log_files) / len(pool._pool)) + func, raw_data, chunksize=ceil(len(raw_data) / len(pool._pool)) ) # Aggregate results from child processes From e0cd21e8f18996049f260fdb1d8d4f7b7a9164bc Mon Sep 17 00:00:00 2001 From: CodingWithTim Date: Mon, 11 Nov 2024 00:31:43 +0000 Subject: [PATCH 07/10] fix chunck size --- fastchat/serve/monitor/clean_chat_data.py | 28 +++++++++++++++-------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/fastchat/serve/monitor/clean_chat_data.py b/fastchat/serve/monitor/clean_chat_data.py index edad2ac08..36eb8201e 100644 --- a/fastchat/serve/monitor/clean_chat_data.py +++ b/fastchat/serve/monitor/clean_chat_data.py @@ -156,15 +156,25 @@ def clean_chat_data(log_files, action_type, num_parallel): ) # Aggregate results from child processes - ct_invalid_conv_id = sum( - [data["ct_invalid_conv_id"] for data in results if "ct_invalid_conv_id" in data] - ) - ct_invalid = sum([data["ct_invalid"] for data in results if "ct_invalid" in data]) - ct_network_error = sum( - [data["ct_network_error"] for data in results if "ct_network_error" in data] - ) - all_models = set([data["model"] for data in results if "model" in data]) - chats = [data["result"] for data in results if "result" in data] + ct_invalid_conv_id = 0 + ct_invalid = 0 + ct_network_error = 0 + all_models = set() + chats = [] + for data in results: + if "ct_invalid_conv_id" in data: + ct_invalid_conv_id += data["ct_invalid_conv_id"] + continue + if "ct_invalid" in data: + ct_invalid += data["ct_invalid"] + continue + if "ct_network_error" in data: + ct_network_error += data["ct_network_error"] + continue + if "model" in data: + all_models.update([data["model"]]) + if "result" in data: + chats.append(data["result"]) chats.sort(key=lambda x: x["tstamp"]) last_updated_tstamp = chats[-1]["tstamp"] From e790f0b530a8eda91541100b1f5dfd64ca37d862 Mon Sep 17 00:00:00 2001 From: CodingWithTim Date: Mon, 11 Nov 2024 00:37:01 +0000 Subject: [PATCH 08/10] add imap --- fastchat/serve/monitor/clean_chat_data.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/fastchat/serve/monitor/clean_chat_data.py b/fastchat/serve/monitor/clean_chat_data.py index 36eb8201e..42cd17327 100644 --- a/fastchat/serve/monitor/clean_chat_data.py +++ b/fastchat/serve/monitor/clean_chat_data.py @@ -12,6 +12,7 @@ from functools import partial from math import ceil from datetime import datetime, timedelta +from tqdm import tqdm import time import multiprocessing as mp @@ -139,8 +140,14 @@ def clean_chat_data(log_files, action_type, num_parallel): with mp.Pool(num_parallel) as pool: # Use partial to pass action_type to get_action_type_data func = partial(get_action_type_data, action_type=action_type) - file_data = pool.map( - func, log_files, chunksize=ceil(len(log_files) / len(pool._pool)) + file_data = list( + tqdm( + pool.imap( + func, log_files, chunksize=ceil(len(log_files) / len(pool._pool)) + ), + total=len(log_files), + desc="Processing Log Files", + ) ) # filter out Nones as some files may not contain any data belong to action_type raw_data = [] @@ -151,8 +158,14 @@ def clean_chat_data(log_files, action_type, num_parallel): # Use the multiprocessing Pool with mp.Pool(num_parallel) as pool: func = partial(process_data, action_type=action_type) - results = pool.map( - func, raw_data, chunksize=ceil(len(raw_data) / len(pool._pool)) + results = list( + tqdm( + pool.imap( + func, raw_data, chunksize=ceil(len(raw_data) / len(pool._pool)) + ), + total=len(raw_data), + desc="Processing Raw Data", + ) ) # Aggregate results from child processes @@ -161,7 +174,7 @@ def clean_chat_data(log_files, action_type, num_parallel): ct_network_error = 0 all_models = set() chats = [] - for data in results: + for data in tqdm(results): if "ct_invalid_conv_id" in data: ct_invalid_conv_id += data["ct_invalid_conv_id"] continue From d8b6c4c6427ab1f3acc73832f11281546b210ed4 Mon Sep 17 00:00:00 2001 From: CodingWithTim Date: Mon, 11 Nov 2024 00:40:23 +0000 Subject: [PATCH 09/10] add imap --- fastchat/serve/monitor/clean_chat_data.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/fastchat/serve/monitor/clean_chat_data.py b/fastchat/serve/monitor/clean_chat_data.py index 42cd17327..a37fe1120 100644 --- a/fastchat/serve/monitor/clean_chat_data.py +++ b/fastchat/serve/monitor/clean_chat_data.py @@ -184,10 +184,8 @@ def clean_chat_data(log_files, action_type, num_parallel): if "ct_network_error" in data: ct_network_error += data["ct_network_error"] continue - if "model" in data: - all_models.update([data["model"]]) - if "result" in data: - chats.append(data["result"]) + all_models.update([data["model"]]) + chats.append(data["result"]) chats.sort(key=lambda x: x["tstamp"]) last_updated_tstamp = chats[-1]["tstamp"] From b803d47985dd3301bc70f332e7b352b0b5c4c417 Mon Sep 17 00:00:00 2001 From: CodingWithTim Date: Mon, 11 Nov 2024 00:41:24 +0000 Subject: [PATCH 10/10] update default parallel --- fastchat/serve/monitor/clean_chat_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastchat/serve/monitor/clean_chat_data.py b/fastchat/serve/monitor/clean_chat_data.py index a37fe1120..ec6da4a65 100644 --- a/fastchat/serve/monitor/clean_chat_data.py +++ b/fastchat/serve/monitor/clean_chat_data.py @@ -218,7 +218,7 @@ def clean_chat_data(log_files, action_type, num_parallel): parser = argparse.ArgumentParser() parser.add_argument("--action-type", type=str, default="chat") parser.add_argument("--max-num-files", type=int) - parser.add_argument("--num-parallel", type=int, default=1) + parser.add_argument("--num-parallel", type=int, default=16) args = parser.parse_args() log_files = get_log_files(args.max_num_files)