diff --git a/ersilia/core/tracking.py b/ersilia/core/tracking.py index f3774715e..2d20a5bce 100644 --- a/ersilia/core/tracking.py +++ b/ersilia/core/tracking.py @@ -5,45 +5,104 @@ import tempfile import logging import boto3 -from botocore.exceptions import ClientError +from botocore.exceptions import ClientError, NoCredentialsError import os +import re +import requests PERSISTENT_FILE_PATH = os.path.abspath("current_session.txt") -# Temporary path to log files +# Temporary path to log files until log files are fixed TEMP_FILE_LOGS = os.path.abspath("") def log_files_metrics(file): - error_count = 0 - warning_count = 0 - - with open(file, "r") as file: - for line in file: - if "| ERROR" in line: - error_count += 1 - elif "| WARNING" in line: - warning_count += 1 - - write_persistent_file(f"Error count: {error_count}") - write_persistent_file(f"Warning count: {warning_count}") - + """ + This function will log the number of errors and warnings in the log files. -def read_csv(file): - # reads csv file and returns Pandas dataframe - return pd.read_csv(file) + :param file: The log file to be read + :return: None (writes to file) + """ + error_count = 0 + warning_count = 0 -def read_json(result): - data = json.load(result) - return data + ersilia_error_flag = False + misc_error_flag = False + error_name = "" + errors = {} + + try: + with open(file, "r") as file: + line = None + for line in file: + if not re.match(r"^\d{2}.\d{2}.\d{2} \| ", line): + # continuation of log + if ersilia_error_flag: + # catch the error name if hinted by previous line + error_name = line.rstrip() + errors[error_name] += 1 + ersilia_error_flag = False + continue + elif misc_error_flag: + error_name += line.rstrip() + if len(error_name) > 100: + error_name = error_name[:97] + "..." + misc_error_flag = False + else: + # encountering new logs + # make sure error flags are closed + if ersilia_error_flag: + errors["Unknown Ersilia exception class"] += 1 + ersilia_error_flag = False + if misc_error_flag: + errors[error_name] += 1 + misc_error_flag = False + if "| ERROR" in line: + error_count += 1 + # checking which type of errors + if "Ersilia exception class:" in line: + # combine this with the next line, usually EmptyOutputError or SourceCodeBaseInformationError + # the detailed message is long + ersilia_error_flag = True + else: + # other errors are pretty self-descriptive and short. Will cap by character + misc_error_flag = True + error_name = line.split("| ERROR | ")[1].rstrip() + elif "| WARNING" in line: + warning_count += 1 + if line is not None: + # in case last log is error + # make sure error flags are closed + if ersilia_error_flag: + errors["Unknown Ersilia exception class"] += 1 + if misc_error_flag: + errors[error_name] += 1 + + write_persistent_file(f"Error count: {error_count}") + if len(errors) > 0: + write_persistent_file(f"Breakdown by error types:") + for error in errors: + write_persistent_file(f"{error}: {errors[error]}") + write_persistent_file(f"Warning count: {warning_count}") + except (IsADirectoryError, FileNotFoundError): + logging.warning("Unable to calculate metrics for log file: log file not found") def open_persistent_file(model_id): + """ + Opens a new persistent file, specifically for a run of model_id + :param model_id: The currently running model + """ with open(PERSISTENT_FILE_PATH, "w") as f: f.write("Session started for model: {0}\n".format(model_id)) def write_persistent_file(contents): + """ + Writes contents to the current persistent file. Only writes if the file actually exists. + :param contents: The contents to write to the file. + """ + # Only write to file if it already exists (we're meant to be tracking this run) if os.path.isfile(PERSISTENT_FILE_PATH): with open(PERSISTENT_FILE_PATH, "a") as f: @@ -51,6 +110,10 @@ def write_persistent_file(contents): def close_persistent_file(): + """ + Closes the persistent file, renaming it to a unique name. + """ + # Make sure the file actually exists before we try renaming if os.path.isfile(PERSISTENT_FILE_PATH): log_files_metrics(TEMP_FILE_LOGS) @@ -89,18 +152,83 @@ def upload_to_s3(json_dict, bucket="t4sg-ersilia", object_name=None): s3_client = boto3.client("s3") try: s3_client.upload_file(tmp.name, bucket, f"{object_name}.json") + except NoCredentialsError: + logging.error( + "Unable to upload tracking data to AWS: Credentials not found" + ) except ClientError as e: logging.error(e) return False return True +def upload_to_cddvault(output_df, api_key): + """ + This function takes in the output dataframe from the model run and uploads the data to CDD vault. + + NOTE: Currently, this is simply a skeleton of what the final code should look like. The TODO details + what the remaining changes should look like. + + :param output_df: The output dataframe from the model run + :param api_key: The API key for CDD Vault's API + :return: The response from the API call + """ + + # We use the slurps API path to be able to bulk upload data + url = "https://app.collaborativedrug.com/api/v1/vaults//slurps" + headers = {"CDD-Token": api_key} + # TODO: Update project and header_mappings ids, as well as adding mappings for other + # output columns if those are to be tracked as well. + data = { + "project": "", + "autoreject": "true", + "mapping_template": { + "registration_type": "CHEMICAL_STRUCTURE", + "header_mappings": [ + { + "header": {"name": "input", "position": 0}, + "definition": { + "id": -1, + "type": "InternalFieldDefinition::MoleculeStructure", + }, + }, + { + "header": {"name": "time", "position": 1}, + "definition": { + "id": -1, + "type": "InternalFieldDefinition::BatchFieldDefinition", + }, + }, + ], + }, + } + + # Save output_df to a CSV of the correct format + new_df = output_df[["input"]].copy() + current_time = datetime.now().isoformat() + + new_df["time"] = current_time + csv_file = tempfile.NamedTemporaryFile(mode="w", suffix=".csv") + new_df.to_csv(csv_file.name, index=False) + + files = {"file": open(csv_file.name, "rb")} + + # Create and make API call + response = requests.post( + url, headers=headers, data={"json": json.dumps(data)}, files=files + ) + if response.status_code == 200: + return response.json() + else: + logging.warning("API call to CDD Vault was Unsuccessful") + return response.text + + class RunTracker: """ This class will be responsible for tracking model runs. It calculates the desired metadata based on a model's - inputs, outputs, and other run-specific features, before uploading them to Ersilia's Splunk dashboard. - - NOTE: Currently, the Splunk connection is not set up. For now, we will print tracking results to the console. + inputs, outputs, and other run-specific features, before uploading them to AWS to be ingested + to Ersilia's Splunk dashboard. """ def __init__(self): @@ -109,6 +237,10 @@ def __init__(self): # function to be called before model is run def start_tracking(self): + """ + Runs any code necessary for the beginning of the run. + Currently necessary for tracking the runtime and memory usage of a run. + """ self.time_start = datetime.now() tracemalloc.start() self.memory_usage_start = tracemalloc.get_traced_memory()[0] @@ -119,15 +251,21 @@ def sample_df(self, df, num_rows, num_cols): """ return df.sample(num_rows, axis=0).sample(num_cols, axis=1) -# Stats function: calculates the basic statistics of the output file from a model. This includes the -# mode (if applicable), minimum, maximum, and standard deviation. def stats(self, result): - dat = read_csv(result) + """ + Stats function: calculates the basic statistics of the output file from a model. This includes the + mode (if applicable), minimum, maximum, and standard deviation. + + :param result: The path to the model's output file. + :return: A dictionary containing the stats for each column of the result. + """ + + dat = pd.read_csv(result) # drop first two columns (key, input) dat = dat.drop(["key", "input"], axis=1) - # calculate and print statistics + # calculate statistics stats = {} for column in dat: column_stats = {} @@ -145,6 +283,13 @@ def stats(self, result): return stats def get_file_sizes(self, input_df, output_df): + """ + Calculates the size of the input and output dataframes, as well as the average size of each row. + + :input_df: Pandas dataframe containing the input data + :output_df: Pandas dataframe containing the output data + :return: dictionary containing the input size, output size, average input size, and average output size + """ input_size = input_df.memory_usage(deep=True).sum() / 1024 output_size = output_df.memory_usage(deep=True).sum() / 1024 @@ -158,32 +303,45 @@ def get_file_sizes(self, input_df, output_df): "avg_output_size": output_avg_row_size, } - def check_types(self, resultDf, metadata): - typeDict = {"float64": "Float", "int64": "Int"} + def check_types(self, result_df, metadata): + """ + This class is responsible for checking the types of the output dataframe against the expected types. + This includes checking the shape of the output dataframe (list vs single) and the types of each column. + + :param result_df: The output dataframe + :param metadata: The metadata dictionary + :return: A dictionary containing the number of mismatched types and a boolean for whether the shape is correct + """ + + type_dict = {"float64": "Float", "int64": "Int"} count = 0 # ignore key and input columns - dtypesLst = resultDf.loc[:, ~resultDf.columns.isin(["key", "input"])].dtypes + dtypes_list = result_df.loc[:, ~result_df.columns.isin(["key", "input"])].dtypes - for i in dtypesLst: - if typeDict[str(i)] != metadata["Output Type"][0]: + for i in dtypes_list: + if type_dict[str(i)] != metadata["Output Type"][0]: count += 1 - if len(dtypesLst) > 1 and metadata["Output Shape"] != "List": - print("Not right shape. Expected List but got Single") + if len(dtypes_list) > 1 and metadata["Output Shape"] != "List": + logging.warning("Not right shape. Expected List but got Single") correct_shape = False - elif len(dtypesLst) == 1 and metadata["Output Shape"] != "Single": - print("Not right shape. Expected Single but got List") + elif len(dtypes_list) == 1 and metadata["Output Shape"] != "Single": + logging.warning("Not right shape. Expected Single but got List") correct_shape = False else: - print("Output is correct shape.") correct_shape = True - print("Output has", count, "mismatched types.\n") + logging.info("Output has", count, "mismatched types.\n") return {"mismatched_types": count, "correct_shape": correct_shape} def get_peak_memory(self): + """ + Calculates the peak memory usage of ersilia's Python instance during the run. + :return: The peak memory usage in bytes. + """ + # Compare memory between peak and amount when we started peak_memory = tracemalloc.get_traced_memory()[1] - self.memory_usage_start tracemalloc.stop() @@ -195,8 +353,8 @@ def track(self, input, result, meta): Tracks the results after a model run. """ json_dict = {} - input_dataframe = read_csv(input) - result_dataframe = read_csv(result) + input_dataframe = pd.read_csv(input) + result_dataframe = pd.read_csv(result) json_dict["input_dataframe"] = input_dataframe.to_dict() json_dict["result_dataframe"] = result_dataframe.to_dict() diff --git a/pyproject.toml b/pyproject.toml index f64a7a5de..c3f0ef442 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,7 @@ sphinx = {version = ">=5.3.0", optional = true} # For compatibility with python jinja2 = {version = "^3.1.2", optional = true} levenshtein = {version = ">=0.21.1,<0.23.0", optional = true} # For faster fuzzy search boto3 = "^1.28.40" +requests = "^2.31.0" [tool.poetry.extras]