Skip to content

Commit

Permalink
Minor clowder tweaks; fix poetry script usage
Browse files Browse the repository at this point in the history
  • Loading branch information
Enkidu93 committed Jan 31, 2024
1 parent be19c4b commit cefa85c
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 53 deletions.
2 changes: 1 addition & 1 deletion clowder/clowder → clowder/clowder.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def _map_status_color(status: Status) -> str:
if status.value == Status.Failed.value:
return "red"
return "white"


if __name__ == "__main__":
app()
21 changes: 12 additions & 9 deletions clowder/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ class InvestigationNotFoundError(Exception):
class ClowderMeta:
def __init__(self, meta_filepath: str) -> None:
self.filepath = meta_filepath
if not Path.exists(Path("..", ".clowder")):
os.mkdir("../.clowder")
if meta_filepath is None:
if not Path.exists(Path("..", ".clowder")):
os.mkdir("../.clowder")
meta_filepath = './clowder/clowder.master.meta.yml'
if not Path.is_file(Path(self.filepath)):
data = {"temp": {"investigations": {}}, "current_root": "temp"}
with open(self.filepath, "w") as f:
Expand All @@ -59,14 +61,14 @@ def flush(self):

class ClowderEnvironment:
def __init__(self):
self.meta = ClowderMeta("../.clowder/clowder.master.meta.yml")
self.meta = ClowderMeta(str(Path(os.environ.get("CLOWDER_META_DIR","../.clowder")) / "clowder.master.meta.yml"))
self.INVESTIGATIONS_GDRIVE_FOLDER = self.root
try:
self.GOOGLE_CREDENTIALS_FILE = (
self._get_env_var("GOOGLE_CREDENTIALS_FILE")
if os.environ.get("GOOGLE_CREDENTIALS_FILE") is not None
else "../.clowder/"
+ list(filter(lambda p: "clowder" in p and ".json" in p, os.listdir("../.clowder/")))[
else os.environ.get("CLOWDER_META_DIR","../.clowder") + "/"
+ list(filter(lambda p: "clowder" in p and ".json" in p, os.listdir(os.environ.get("CLOWDER_META_DIR","../.clowder") + "/")))[
0
] # TODO more robust
)
Expand Down Expand Up @@ -157,11 +159,12 @@ def _get_clearml_tasks(self, investigation_name: str) -> "dict[str, Union[None,T
experiments = self.current_meta["investigations"][investigation_name]["experiments"]
tasks = {}
for experiment_name, obj in experiments.items():
clearml_id = obj["clearml_id"]
clearml_id = obj.get("clearml_id")
if clearml_id is None or clearml_id == "unknown":
continue
task: Optional[Task] = Task.get_task(task_id=clearml_id)
tasks[experiment_name] = task
tasks[experiment_name] = None
else:
task: Optional[Task] = Task.get_task(task_id=clearml_id)
tasks[experiment_name] = task
return tasks

def track_investigation_by_name(self, investigation_name: str):
Expand Down
145 changes: 103 additions & 42 deletions clowder/investigation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from io import StringIO
from time import sleep
from typing import Optional, Union
import os
from pathlib import Path

import gspread
import gspread_dataframe as gd
Expand Down Expand Up @@ -76,7 +78,8 @@ def _get_experiments_df(self):
if ENTRYPOINT_ATTRIBUTE not in experiments_df.columns:
raise MissingConfigurationFile("Missing entrypoint column in ExperimentsSetup sheet")
experiments_df.set_index(experiments_df.name, inplace=True)
experiments_df.drop("", inplace=True)
if "" in experiments_df.index:
experiments_df.drop("", inplace=True)
if experiments_df.index.duplicated().sum() > 0:
raise MissingConfigurationFile(
"Duplicate names in experiments google sheet. Each name needs to be unique."
Expand Down Expand Up @@ -123,8 +126,11 @@ def start_investigation(self, force_rerun: bool = False) -> bool:
Task.TaskStatusEnum.queued,
]:
continue
if row["entrypoint"] == "":
continue
experiment_path: s3path.S3Path = self.investigation_s3_path / row[NAME_ATTRIBUTE]
command = f'python -m {row["entrypoint"]} --clearml-queue {CLEARML_QUEUE} "{"/".join(str(experiment_path.absolute()).split("/")[4:])}"'
complete_entrypoint = row["entrypoint"].replace('$EXP',"/".join(str(experiment_path.absolute()).split("/")[4:])).replace('$ON_CLEARML',f'--clearml-queue {CLEARML_QUEUE}').replace('$LOCAL_EXP_DIR',str(Path(os.environ.get('SIL_NLP_DATA_PATH')) / 'MT/experiments'))
command = f'python -m {complete_entrypoint}'
print("[green]Running command: [/green]", command)
result = subprocess.run(
command,
Expand All @@ -133,8 +139,10 @@ def start_investigation(self, force_rerun: bool = False) -> bool:
text=True,
)
print(result.stdout)
if result.stdout == "":
if result.returncode != 0:
print(f"[red]{result.stderr}[/red]")
temp_meta[row[NAME_ATTRIBUTE]]['status'] = Task.TaskStatusEnum.failed.value
continue
now_running = True
match = re.search(r"task id=(.*)", result.stdout)
clearml_id = match.group(1) if match is not None else "unknown"
Expand All @@ -154,16 +162,19 @@ def sync(self, gather_results=True, copy_all_results_to_gdrive: bool = True):
if "experiments" not in remote_meta_content:
remote_meta_content["experiments"] = {}
for name, task in clearml_tasks_dict.items():
if task is None:
if task is None and self.experiments[name].get('clearml_id',None) != "unknown":
continue
if name not in remote_meta_content["experiments"]:
remote_meta_content["experiments"][name] = {}
remote_meta_content["experiments"][name]["results_already_gathered"] = False
remote_meta_content["experiments"][name]["clearml_id"] = task.id
remote_meta_content["experiments"][name][
"clearml_task_url"
] = f"https://{CLEARML_URL}/projects/*/experiments/{task.id}/output/execution"
remote_meta_content["experiments"][name]["status"] = task.get_status()
if self.experiments[name]['clearml_id'] != "unknown":
remote_meta_content["experiments"][name]["clearml_id"] = task.id
remote_meta_content["experiments"][name][
"clearml_task_url"
] = f"https://{CLEARML_URL}/projects/*/experiments/{task.id}/output/execution"
remote_meta_content["experiments"][name]["status"] = task.get_status()
else:
remote_meta_content["experiments"][name]["status"] = Task.TaskStatusEnum.completed.value
ENV._write_gdrive_file_in_folder(
self.id, "clowder.meta.yml", yaml.safe_dump(remote_meta_content), "application/x-yaml"
)
Expand All @@ -173,12 +184,15 @@ def sync(self, gather_results=True, copy_all_results_to_gdrive: bool = True):
for exp in ENV.current_meta["investigations"][self.name]["experiments"].keys():
if "experiments" not in remote_meta_content or exp not in remote_meta_content["experiments"]:
continue
ENV.current_meta["investigations"][self.name]["experiments"][exp]["clearml_id"] = remote_meta_content[
"experiments"
][exp]["clearml_id"]
ENV.current_meta["investigations"][self.name]["experiments"][exp]["clearml_task_url"] = remote_meta_content[
"experiments"
][exp]["clearml_task_url"]
if 'clearml_id' in ENV.current_meta["investigations"][self.name]["experiments"][exp] and 'clearml_id' in remote_meta_content[
"experiments"
][exp]:
ENV.current_meta["investigations"][self.name]["experiments"][exp]["clearml_id"] = remote_meta_content[
"experiments"
][exp]["clearml_id"]
ENV.current_meta["investigations"][self.name]["experiments"][exp]["clearml_task_url"] = remote_meta_content[
"experiments"
][exp]["clearml_task_url"]
ENV.current_meta["investigations"][self.name]["experiments"][exp]["status"] = remote_meta_content[
"experiments"
][exp]["status"]
Expand Down Expand Up @@ -218,7 +232,7 @@ def _generate_results(self, for_experiments: Optional[list] = None, copy_all_res
experiment_folders = ENV._dict_of_gdrive_files(self.experiments_folder_id)
print("Copying over results...")
for _, row in tqdm(setup_df.iterrows()):
if for_experiments is not None and row[NAME_ATTRIBUTE] not in for_experiments:
if (for_experiments is not None and row[NAME_ATTRIBUTE] not in for_experiments) or (row[ENTRYPOINT_ATTRIBUTE] == ""):
continue
if not ENV.current_meta["investigations"][self.name]["experiments"][row[NAME_ATTRIBUTE]].get(
"results_already_gathered", False
Expand All @@ -230,15 +244,27 @@ def _generate_results(self, for_experiments: Optional[list] = None, copy_all_res
if len(csv_results_files) > 0 and csv_results_files[0].strip() != "":
for name in csv_results_files:
name = name.strip()
s3_filepath: s3path.S3Path = (
self.investigation_s3_path / row[NAME_ATTRIBUTE] / name
) # TODO - use result that's already been copied over to gdrive
if "token" not in name:
continue
s3_filepath: s3path.S3Path = list(
(
self.investigation_s3_path / row[NAME_ATTRIBUTE]
).glob(
name if len(name.split('?')) <= 1 else name.split('?')[0]
)
)[0] # TODO - use result that's already been copied over to gdrive

with s3_filepath.open() as f:
df = pd.read_csv(StringIO(f.read()))
df = None
if 'tokenizer' not in name:
df = pd.read_csv(StringIO(f.read()), header=[0,1])
else:
df = pd.read_csv(StringIO(f.read()))
if "scores" in name:
name = "scores"
df = self._process_scores_csv(df)
df.insert(0, NAME_ATTRIBUTE, [row[NAME_ATTRIBUTE]])
if len(df.index) == 1:
df.insert(0, NAME_ATTRIBUTE, [row[NAME_ATTRIBUTE]])
if name not in results:
results[name] = pd.DataFrame()
results[name] = pd.concat([results[name], df], join="outer", ignore_index=True)
Expand Down Expand Up @@ -269,38 +295,59 @@ def _generate_results(self, for_experiments: Optional[list] = None, copy_all_res
results["clearml_metrics"] = metrics_df

print("Processing results data...")
quota = 0
for name, df in tqdm(results.items()):
name_elements = name.split("?")
name = name_elements[0]
color_mode = "column" #overall column, row, nocolor #TODO enum?
if len(name_elements) > 1 and name_elements[1] in ["overall", "column", "row", "nocolor"]:
color_mode = name_elements[1]
for w in spreadsheet.worksheets():
if w.title == name:
spreadsheet.del_worksheet(w)
s = spreadsheet.add_worksheet(name, rows=0, cols=0)
gd.set_with_dataframe(s, df)
if color_mode != 'nocolor':
self.color_code(df, s, color_mode)

def color_code(self, df: pd.DataFrame, s: Worksheet, mode:str):
quota = 0
min_max_df = None
if mode == 'row':
min_max_df = self._min_and_max_per_row(df)
elif mode == 'overall':
min_max_df = self._min_and_max_overall(df)
else:
min_max_df = self._min_and_max_per_col(df)
row_index = 0
for _, row in df.iterrows():
col_index = 0
for col in df.columns:
if not np.issubdtype(df.dtypes[col], np.number):
col_index += 1
continue
ref = s.cell(
row_index = 0
if isinstance(min_max_df.index[0], tuple):
row_index += len(min_max_df.index[0]) - 1
for label, row in df.iterrows():
col_index = 0
for col in df.columns:
if not np.issubdtype(df.dtypes[col], np.number):
col_index += 1
continue
ref = s.cell(
row_index + 2, col_index + 1 # type: ignore
).address # +2 = 1 + 1 - 1 for zero- vs. one-indexed and 1 to skip column names
col: str
max = min_max_df.at[col, "max"]
min = min_max_df.at[col, "min"]
range = max - min
r, g, b = self._color_func((row[col] - min) / (range) if range != 0 else 1.0)
s.format(f"{ref}", {"backgroundColor": {"red": r, "green": g, "blue": b}})
col_index += 1
quota += 1
if quota > 1:
sleep(
min_max_row: str = col
if mode == 'row':
min_max_row = label
elif mode == 'overall':
min_max_row = 0
max = min_max_df.at[min_max_row, "max"]
min = min_max_df.at[min_max_row, "min"]
range = max - min
r, g, b = self._color_func((row[col] - min) / (range) if range != 0 else 1.0)
s.format(f"{ref}", {"backgroundColor": {"red": r, "green": g, "blue": b}})
col_index += 1
quota += 1
if quota > 1:
sleep(
2
) # TODO avoids exceeded per minute read/write quota - find better solution: batching and guide to change quotas
quota = 0
row_index += 1
quota = 0
row_index += 1

def _process_scores_csv(self, df: pd.DataFrame) -> pd.DataFrame:
ret = df[["score"]]
Expand All @@ -321,6 +368,20 @@ def _min_and_max_per_col(self, df: pd.DataFrame):
for col in df.columns:
ret[col] = [df[col].max(), df[col].min()]
return pd.DataFrame.from_dict(ret, orient="index", columns=["max", "min"])

def _min_and_max_per_row(self, df: pd.DataFrame): #TODO
df = df.select_dtypes(include="number")
ret = {}
col: str
for index, row in df.iterrows():
ret[index] = [row.max(), row.min()]
return pd.DataFrame.from_dict(ret, orient="index", columns=["max", "min"])

def _min_and_max_overall(self, df: pd.DataFrame): #TODO
max = df.max(numeric_only=True).max()
min = df.min(numeric_only=True).min()
return pd.DataFrame.from_dict({0:[max,min]}, orient="index", columns=["max", "min"])


def _color_func(self, x: float) -> tuple:
if x > 0.5:
Expand Down
Loading

0 comments on commit cefa85c

Please sign in to comment.