From cefa85c31a3a32098a9a8ad937eade14b3b3ec17 Mon Sep 17 00:00:00 2001 From: Enkidu93 Date: Wed, 31 Jan 2024 14:48:13 -0500 Subject: [PATCH] Minor clowder tweaks; fix poetry script usage --- clowder/{clowder => clowder.py} | 2 +- clowder/environment.py | 21 ++-- clowder/investigation.py | 145 ++++++++++++++++++-------- clowder/scratch.ipynb | 142 +++++++++++++++++++++++++ silnlp/common/collect_verse_counts.py | 2 +- 5 files changed, 259 insertions(+), 53 deletions(-) rename clowder/{clowder => clowder.py} (99%) create mode 100644 clowder/scratch.ipynb diff --git a/clowder/clowder b/clowder/clowder.py similarity index 99% rename from clowder/clowder rename to clowder/clowder.py index 57971d19..516d11be 100755 --- a/clowder/clowder +++ b/clowder/clowder.py @@ -141,7 +141,7 @@ def _map_status_color(status: Status) -> str: if status.value == Status.Failed.value: return "red" return "white" - + if __name__ == "__main__": app() diff --git a/clowder/environment.py b/clowder/environment.py index a9bf0233..5331609d 100644 --- a/clowder/environment.py +++ b/clowder/environment.py @@ -41,8 +41,10 @@ class InvestigationNotFoundError(Exception): class ClowderMeta: def __init__(self, meta_filepath: str) -> None: self.filepath = meta_filepath - if not Path.exists(Path("..", ".clowder")): - os.mkdir("../.clowder") + if meta_filepath is None: + if not Path.exists(Path("..", ".clowder")): + os.mkdir("../.clowder") + meta_filepath = './clowder/clowder.master.meta.yml' if not Path.is_file(Path(self.filepath)): data = {"temp": {"investigations": {}}, "current_root": "temp"} with open(self.filepath, "w") as f: @@ -59,14 +61,14 @@ def flush(self): class ClowderEnvironment: def __init__(self): - self.meta = ClowderMeta("../.clowder/clowder.master.meta.yml") + self.meta = ClowderMeta(str(Path(os.environ.get("CLOWDER_META_DIR","../.clowder")) / "clowder.master.meta.yml")) self.INVESTIGATIONS_GDRIVE_FOLDER = self.root try: self.GOOGLE_CREDENTIALS_FILE = ( self._get_env_var("GOOGLE_CREDENTIALS_FILE") if os.environ.get("GOOGLE_CREDENTIALS_FILE") is not None - else "../.clowder/" - + list(filter(lambda p: "clowder" in p and ".json" in p, os.listdir("../.clowder/")))[ + else os.environ.get("CLOWDER_META_DIR","../.clowder") + "/" + + list(filter(lambda p: "clowder" in p and ".json" in p, os.listdir(os.environ.get("CLOWDER_META_DIR","../.clowder") + "/")))[ 0 ] # TODO more robust ) @@ -157,11 +159,12 @@ def _get_clearml_tasks(self, investigation_name: str) -> "dict[str, Union[None,T experiments = self.current_meta["investigations"][investigation_name]["experiments"] tasks = {} for experiment_name, obj in experiments.items(): - clearml_id = obj["clearml_id"] + clearml_id = obj.get("clearml_id") if clearml_id is None or clearml_id == "unknown": - continue - task: Optional[Task] = Task.get_task(task_id=clearml_id) - tasks[experiment_name] = task + tasks[experiment_name] = None + else: + task: Optional[Task] = Task.get_task(task_id=clearml_id) + tasks[experiment_name] = task return tasks def track_investigation_by_name(self, investigation_name: str): diff --git a/clowder/investigation.py b/clowder/investigation.py index aa58b920..2523cfca 100644 --- a/clowder/investigation.py +++ b/clowder/investigation.py @@ -3,6 +3,8 @@ from io import StringIO from time import sleep from typing import Optional, Union +import os +from pathlib import Path import gspread import gspread_dataframe as gd @@ -76,7 +78,8 @@ def _get_experiments_df(self): if ENTRYPOINT_ATTRIBUTE not in experiments_df.columns: raise MissingConfigurationFile("Missing entrypoint column in ExperimentsSetup sheet") experiments_df.set_index(experiments_df.name, inplace=True) - experiments_df.drop("", inplace=True) + if "" in experiments_df.index: + experiments_df.drop("", inplace=True) if experiments_df.index.duplicated().sum() > 0: raise MissingConfigurationFile( "Duplicate names in experiments google sheet. Each name needs to be unique." @@ -123,8 +126,11 @@ def start_investigation(self, force_rerun: bool = False) -> bool: Task.TaskStatusEnum.queued, ]: continue + if row["entrypoint"] == "": + continue experiment_path: s3path.S3Path = self.investigation_s3_path / row[NAME_ATTRIBUTE] - command = f'python -m {row["entrypoint"]} --clearml-queue {CLEARML_QUEUE} "{"/".join(str(experiment_path.absolute()).split("/")[4:])}"' + complete_entrypoint = row["entrypoint"].replace('$EXP',"/".join(str(experiment_path.absolute()).split("/")[4:])).replace('$ON_CLEARML',f'--clearml-queue {CLEARML_QUEUE}').replace('$LOCAL_EXP_DIR',str(Path(os.environ.get('SIL_NLP_DATA_PATH')) / 'MT/experiments')) + command = f'python -m {complete_entrypoint}' print("[green]Running command: [/green]", command) result = subprocess.run( command, @@ -133,8 +139,10 @@ def start_investigation(self, force_rerun: bool = False) -> bool: text=True, ) print(result.stdout) - if result.stdout == "": + if result.returncode != 0: print(f"[red]{result.stderr}[/red]") + temp_meta[row[NAME_ATTRIBUTE]]['status'] = Task.TaskStatusEnum.failed.value + continue now_running = True match = re.search(r"task id=(.*)", result.stdout) clearml_id = match.group(1) if match is not None else "unknown" @@ -154,16 +162,19 @@ def sync(self, gather_results=True, copy_all_results_to_gdrive: bool = True): if "experiments" not in remote_meta_content: remote_meta_content["experiments"] = {} for name, task in clearml_tasks_dict.items(): - if task is None: + if task is None and self.experiments[name].get('clearml_id',None) != "unknown": continue if name not in remote_meta_content["experiments"]: remote_meta_content["experiments"][name] = {} remote_meta_content["experiments"][name]["results_already_gathered"] = False - remote_meta_content["experiments"][name]["clearml_id"] = task.id - remote_meta_content["experiments"][name][ - "clearml_task_url" - ] = f"https://{CLEARML_URL}/projects/*/experiments/{task.id}/output/execution" - remote_meta_content["experiments"][name]["status"] = task.get_status() + if self.experiments[name]['clearml_id'] != "unknown": + remote_meta_content["experiments"][name]["clearml_id"] = task.id + remote_meta_content["experiments"][name][ + "clearml_task_url" + ] = f"https://{CLEARML_URL}/projects/*/experiments/{task.id}/output/execution" + remote_meta_content["experiments"][name]["status"] = task.get_status() + else: + remote_meta_content["experiments"][name]["status"] = Task.TaskStatusEnum.completed.value ENV._write_gdrive_file_in_folder( self.id, "clowder.meta.yml", yaml.safe_dump(remote_meta_content), "application/x-yaml" ) @@ -173,12 +184,15 @@ def sync(self, gather_results=True, copy_all_results_to_gdrive: bool = True): for exp in ENV.current_meta["investigations"][self.name]["experiments"].keys(): if "experiments" not in remote_meta_content or exp not in remote_meta_content["experiments"]: continue - ENV.current_meta["investigations"][self.name]["experiments"][exp]["clearml_id"] = remote_meta_content[ - "experiments" - ][exp]["clearml_id"] - ENV.current_meta["investigations"][self.name]["experiments"][exp]["clearml_task_url"] = remote_meta_content[ - "experiments" - ][exp]["clearml_task_url"] + if 'clearml_id' in ENV.current_meta["investigations"][self.name]["experiments"][exp] and 'clearml_id' in remote_meta_content[ + "experiments" + ][exp]: + ENV.current_meta["investigations"][self.name]["experiments"][exp]["clearml_id"] = remote_meta_content[ + "experiments" + ][exp]["clearml_id"] + ENV.current_meta["investigations"][self.name]["experiments"][exp]["clearml_task_url"] = remote_meta_content[ + "experiments" + ][exp]["clearml_task_url"] ENV.current_meta["investigations"][self.name]["experiments"][exp]["status"] = remote_meta_content[ "experiments" ][exp]["status"] @@ -218,7 +232,7 @@ def _generate_results(self, for_experiments: Optional[list] = None, copy_all_res experiment_folders = ENV._dict_of_gdrive_files(self.experiments_folder_id) print("Copying over results...") for _, row in tqdm(setup_df.iterrows()): - if for_experiments is not None and row[NAME_ATTRIBUTE] not in for_experiments: + if (for_experiments is not None and row[NAME_ATTRIBUTE] not in for_experiments) or (row[ENTRYPOINT_ATTRIBUTE] == ""): continue if not ENV.current_meta["investigations"][self.name]["experiments"][row[NAME_ATTRIBUTE]].get( "results_already_gathered", False @@ -230,15 +244,27 @@ def _generate_results(self, for_experiments: Optional[list] = None, copy_all_res if len(csv_results_files) > 0 and csv_results_files[0].strip() != "": for name in csv_results_files: name = name.strip() - s3_filepath: s3path.S3Path = ( - self.investigation_s3_path / row[NAME_ATTRIBUTE] / name - ) # TODO - use result that's already been copied over to gdrive + if "token" not in name: + continue + s3_filepath: s3path.S3Path = list( + ( + self.investigation_s3_path / row[NAME_ATTRIBUTE] + ).glob( + name if len(name.split('?')) <= 1 else name.split('?')[0] + ) + )[0] # TODO - use result that's already been copied over to gdrive + with s3_filepath.open() as f: - df = pd.read_csv(StringIO(f.read())) + df = None + if 'tokenizer' not in name: + df = pd.read_csv(StringIO(f.read()), header=[0,1]) + else: + df = pd.read_csv(StringIO(f.read())) if "scores" in name: name = "scores" df = self._process_scores_csv(df) - df.insert(0, NAME_ATTRIBUTE, [row[NAME_ATTRIBUTE]]) + if len(df.index) == 1: + df.insert(0, NAME_ATTRIBUTE, [row[NAME_ATTRIBUTE]]) if name not in results: results[name] = pd.DataFrame() results[name] = pd.concat([results[name], df], join="outer", ignore_index=True) @@ -269,38 +295,59 @@ def _generate_results(self, for_experiments: Optional[list] = None, copy_all_res results["clearml_metrics"] = metrics_df print("Processing results data...") - quota = 0 for name, df in tqdm(results.items()): + name_elements = name.split("?") + name = name_elements[0] + color_mode = "column" #overall column, row, nocolor #TODO enum? + if len(name_elements) > 1 and name_elements[1] in ["overall", "column", "row", "nocolor"]: + color_mode = name_elements[1] for w in spreadsheet.worksheets(): if w.title == name: spreadsheet.del_worksheet(w) s = spreadsheet.add_worksheet(name, rows=0, cols=0) gd.set_with_dataframe(s, df) + if color_mode != 'nocolor': + self.color_code(df, s, color_mode) + + def color_code(self, df: pd.DataFrame, s: Worksheet, mode:str): + quota = 0 + min_max_df = None + if mode == 'row': + min_max_df = self._min_and_max_per_row(df) + elif mode == 'overall': + min_max_df = self._min_and_max_overall(df) + else: min_max_df = self._min_and_max_per_col(df) - row_index = 0 - for _, row in df.iterrows(): - col_index = 0 - for col in df.columns: - if not np.issubdtype(df.dtypes[col], np.number): - col_index += 1 - continue - ref = s.cell( + row_index = 0 + if isinstance(min_max_df.index[0], tuple): + row_index += len(min_max_df.index[0]) - 1 + for label, row in df.iterrows(): + col_index = 0 + for col in df.columns: + if not np.issubdtype(df.dtypes[col], np.number): + col_index += 1 + continue + ref = s.cell( row_index + 2, col_index + 1 # type: ignore ).address # +2 = 1 + 1 - 1 for zero- vs. one-indexed and 1 to skip column names - col: str - max = min_max_df.at[col, "max"] - min = min_max_df.at[col, "min"] - range = max - min - r, g, b = self._color_func((row[col] - min) / (range) if range != 0 else 1.0) - s.format(f"{ref}", {"backgroundColor": {"red": r, "green": g, "blue": b}}) - col_index += 1 - quota += 1 - if quota > 1: - sleep( + min_max_row: str = col + if mode == 'row': + min_max_row = label + elif mode == 'overall': + min_max_row = 0 + max = min_max_df.at[min_max_row, "max"] + min = min_max_df.at[min_max_row, "min"] + range = max - min + r, g, b = self._color_func((row[col] - min) / (range) if range != 0 else 1.0) + s.format(f"{ref}", {"backgroundColor": {"red": r, "green": g, "blue": b}}) + col_index += 1 + quota += 1 + if quota > 1: + sleep( 2 ) # TODO avoids exceeded per minute read/write quota - find better solution: batching and guide to change quotas - quota = 0 - row_index += 1 + quota = 0 + row_index += 1 def _process_scores_csv(self, df: pd.DataFrame) -> pd.DataFrame: ret = df[["score"]] @@ -321,6 +368,20 @@ def _min_and_max_per_col(self, df: pd.DataFrame): for col in df.columns: ret[col] = [df[col].max(), df[col].min()] return pd.DataFrame.from_dict(ret, orient="index", columns=["max", "min"]) + + def _min_and_max_per_row(self, df: pd.DataFrame): #TODO + df = df.select_dtypes(include="number") + ret = {} + col: str + for index, row in df.iterrows(): + ret[index] = [row.max(), row.min()] + return pd.DataFrame.from_dict(ret, orient="index", columns=["max", "min"]) + + def _min_and_max_overall(self, df: pd.DataFrame): #TODO + max = df.max(numeric_only=True).max() + min = df.min(numeric_only=True).min() + return pd.DataFrame.from_dict({0:[max,min]}, orient="index", columns=["max", "min"]) + def _color_func(self, x: float) -> tuple: if x > 0.5: diff --git a/clowder/scratch.ipynb b/clowder/scratch.ipynb new file mode 100644 index 00000000..5318e87c --- /dev/null +++ b/clowder/scratch.ipynb @@ -0,0 +1,142 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "df = pd.DataFrame.from_dict({'A':[3,6,5],'B':[2,4,7],'C':[0,6,0]})" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
ABC
0320
1646
2570
\n", + "
" + ], + "text/plain": [ + " A B C\n", + "0 3 2 0\n", + "1 6 4 6\n", + "2 5 7 0" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "from investigation import Investigation" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " max min\n", + "0 7 0\n" + ] + } + ], + "source": [ + "print(Investigation._min_and_max_overall(None,df))\n", + "print(Investigation._min_and_max_per_row(None,df))\n", + "print(Investigation._min_and_max_per_col(None,df))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "silnlp-oDyOqe5e-py3.8", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.10" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/silnlp/common/collect_verse_counts.py b/silnlp/common/collect_verse_counts.py index 5d3dc960..ed030f09 100644 --- a/silnlp/common/collect_verse_counts.py +++ b/silnlp/common/collect_verse_counts.py @@ -108,7 +108,7 @@ def main() -> None: parser = argparse.ArgumentParser(description="Collect various counts from a corpus of Bible extracts") - parser.add_argument("--input-folder", help="Folder with corpus of Bible extracts", required=True) + parser.add_argument("--input-folder", default=SIL_NLP_ENV.mt_scripture_dir, help="Folder with corpus of Bible extracts") parser.add_argument("--output-folder", help="Folder in which to save results", required=True) parser.add_argument( "--files",