Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats documentation #20

Closed
wants to merge 11 commits into from
240 changes: 199 additions & 41 deletions ersilia/core/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,115 @@
import tempfile
import logging
import boto3
from botocore.exceptions import ClientError
from botocore.exceptions import ClientError, NoCredentialsError
import os
import re
import requests

PERSISTENT_FILE_PATH = os.path.abspath("current_session.txt")
# Temporary path to log files
# Temporary path to log files until log files are fixed
TEMP_FILE_LOGS = os.path.abspath("")


def log_files_metrics(file):
error_count = 0
warning_count = 0

with open(file, "r") as file:
for line in file:
if "| ERROR" in line:
error_count += 1
elif "| WARNING" in line:
warning_count += 1

write_persistent_file(f"Error count: {error_count}")
write_persistent_file(f"Warning count: {warning_count}")

"""
This function will log the number of errors and warnings in the log files.

def read_csv(file):
# reads csv file and returns Pandas dataframe
return pd.read_csv(file)
:param file: The log file to be read
:return: None (writes to file)
"""

error_count = 0
warning_count = 0

def read_json(result):
data = json.load(result)
return data
ersilia_error_flag = False
misc_error_flag = False
error_name = ""
errors = {}

try:
with open(file, "r") as file:
line = None
for line in file:
if not re.match(r"^\d{2}.\d{2}.\d{2} \| ", line):
# continuation of log
if ersilia_error_flag:
# catch the error name if hinted by previous line
error_name = line.rstrip()
errors[error_name] += 1
ersilia_error_flag = False
continue
elif misc_error_flag:
error_name += line.rstrip()
if len(error_name) > 100:
error_name = error_name[:97] + "..."
misc_error_flag = False
else:
# encountering new logs
# make sure error flags are closed
if ersilia_error_flag:
errors["Unknown Ersilia exception class"] += 1
ersilia_error_flag = False
if misc_error_flag:
errors[error_name] += 1
misc_error_flag = False
if "| ERROR" in line:
error_count += 1
# checking which type of errors
if "Ersilia exception class:" in line:
# combine this with the next line, usually EmptyOutputError or SourceCodeBaseInformationError
# the detailed message is long
ersilia_error_flag = True
else:
# other errors are pretty self-descriptive and short. Will cap by character
misc_error_flag = True
error_name = line.split("| ERROR | ")[1].rstrip()
elif "| WARNING" in line:
warning_count += 1
if line is not None:
# in case last log is error
# make sure error flags are closed
if ersilia_error_flag:
errors["Unknown Ersilia exception class"] += 1
if misc_error_flag:
errors[error_name] += 1

write_persistent_file(f"Error count: {error_count}")
if len(errors) > 0:
write_persistent_file(f"Breakdown by error types:")
for error in errors:
write_persistent_file(f"{error}: {errors[error]}")
write_persistent_file(f"Warning count: {warning_count}")
except (IsADirectoryError, FileNotFoundError):
logging.warning("Unable to calculate metrics for log file: log file not found")


def open_persistent_file(model_id):
"""
Opens a new persistent file, specifically for a run of model_id
:param model_id: The currently running model
"""
with open(PERSISTENT_FILE_PATH, "w") as f:
f.write("Session started for model: {0}\n".format(model_id))


def write_persistent_file(contents):
"""
Writes contents to the current persistent file. Only writes if the file actually exists.
:param contents: The contents to write to the file.
"""

# Only write to file if it already exists (we're meant to be tracking this run)
if os.path.isfile(PERSISTENT_FILE_PATH):
with open(PERSISTENT_FILE_PATH, "a") as f:
f.write(f"{contents}\n")


def close_persistent_file():
"""
Closes the persistent file, renaming it to a unique name.
"""

# Make sure the file actually exists before we try renaming
if os.path.isfile(PERSISTENT_FILE_PATH):
log_files_metrics(TEMP_FILE_LOGS)
Expand Down Expand Up @@ -89,18 +152,83 @@ def upload_to_s3(json_dict, bucket="t4sg-ersilia", object_name=None):
s3_client = boto3.client("s3")
try:
s3_client.upload_file(tmp.name, bucket, f"{object_name}.json")
except NoCredentialsError:
logging.error(
"Unable to upload tracking data to AWS: Credentials not found"
)
except ClientError as e:
logging.error(e)
return False
return True


def upload_to_cddvault(output_df, api_key):
"""
This function takes in the output dataframe from the model run and uploads the data to CDD vault.

NOTE: Currently, this is simply a skeleton of what the final code should look like. The TODO details
what the remaining changes should look like.

:param output_df: The output dataframe from the model run
:param api_key: The API key for CDD Vault's API
:return: The response from the API call
"""

# We use the slurps API path to be able to bulk upload data
url = "https://app.collaborativedrug.com/api/v1/vaults/<vault_id>/slurps"
headers = {"CDD-Token": api_key}
# TODO: Update project and header_mappings ids, as well as adding mappings for other
# output columns if those are to be tracked as well.
data = {
"project": "",
"autoreject": "true",
"mapping_template": {
"registration_type": "CHEMICAL_STRUCTURE",
"header_mappings": [
{
"header": {"name": "input", "position": 0},
"definition": {
"id": -1,
"type": "InternalFieldDefinition::MoleculeStructure",
},
},
{
"header": {"name": "time", "position": 1},
"definition": {
"id": -1,
"type": "InternalFieldDefinition::BatchFieldDefinition",
},
},
],
},
}

# Save output_df to a CSV of the correct format
new_df = output_df[["input"]].copy()
current_time = datetime.now().isoformat()

new_df["time"] = current_time
csv_file = tempfile.NamedTemporaryFile(mode="w", suffix=".csv")
new_df.to_csv(csv_file.name, index=False)

files = {"file": open(csv_file.name, "rb")}

# Create and make API call
response = requests.post(
url, headers=headers, data={"json": json.dumps(data)}, files=files
)
if response.status_code == 200:
return response.json()
else:
logging.warning("API call to CDD Vault was Unsuccessful")
return response.text


class RunTracker:
"""
This class will be responsible for tracking model runs. It calculates the desired metadata based on a model's
inputs, outputs, and other run-specific features, before uploading them to Ersilia's Splunk dashboard.

NOTE: Currently, the Splunk connection is not set up. For now, we will print tracking results to the console.
inputs, outputs, and other run-specific features, before uploading them to AWS to be ingested
to Ersilia's Splunk dashboard.
"""

def __init__(self):
Expand All @@ -109,6 +237,10 @@ def __init__(self):

# function to be called before model is run
def start_tracking(self):
"""
Runs any code necessary for the beginning of the run.
Currently necessary for tracking the runtime and memory usage of a run.
"""
self.time_start = datetime.now()
tracemalloc.start()
self.memory_usage_start = tracemalloc.get_traced_memory()[0]
Expand All @@ -119,15 +251,21 @@ def sample_df(self, df, num_rows, num_cols):
"""
return df.sample(num_rows, axis=0).sample(num_cols, axis=1)

# Stats function: calculates the basic statistics of the output file from a model. This includes the
# mode (if applicable), minimum, maximum, and standard deviation.
def stats(self, result):
dat = read_csv(result)
"""
Stats function: calculates the basic statistics of the output file from a model. This includes the
mode (if applicable), minimum, maximum, and standard deviation.

:param result: The path to the model's output file.
:return: A dictionary containing the stats for each column of the result.
"""

dat = pd.read_csv(result)

# drop first two columns (key, input)
dat = dat.drop(["key", "input"], axis=1)

# calculate and print statistics
# calculate statistics
stats = {}
for column in dat:
column_stats = {}
Expand All @@ -145,6 +283,13 @@ def stats(self, result):
return stats

def get_file_sizes(self, input_df, output_df):
"""
Calculates the size of the input and output dataframes, as well as the average size of each row.

:input_df: Pandas dataframe containing the input data
:output_df: Pandas dataframe containing the output data
:return: dictionary containing the input size, output size, average input size, and average output size
"""
input_size = input_df.memory_usage(deep=True).sum() / 1024
output_size = output_df.memory_usage(deep=True).sum() / 1024

Expand All @@ -158,32 +303,45 @@ def get_file_sizes(self, input_df, output_df):
"avg_output_size": output_avg_row_size,
}

def check_types(self, resultDf, metadata):
typeDict = {"float64": "Float", "int64": "Int"}
def check_types(self, result_df, metadata):
"""
This class is responsible for checking the types of the output dataframe against the expected types.
This includes checking the shape of the output dataframe (list vs single) and the types of each column.

:param result_df: The output dataframe
:param metadata: The metadata dictionary
:return: A dictionary containing the number of mismatched types and a boolean for whether the shape is correct
"""

type_dict = {"float64": "Float", "int64": "Int"}
count = 0

# ignore key and input columns
dtypesLst = resultDf.loc[:, ~resultDf.columns.isin(["key", "input"])].dtypes
dtypes_list = result_df.loc[:, ~result_df.columns.isin(["key", "input"])].dtypes

for i in dtypesLst:
if typeDict[str(i)] != metadata["Output Type"][0]:
for i in dtypes_list:
if type_dict[str(i)] != metadata["Output Type"][0]:
count += 1

if len(dtypesLst) > 1 and metadata["Output Shape"] != "List":
print("Not right shape. Expected List but got Single")
if len(dtypes_list) > 1 and metadata["Output Shape"] != "List":
logging.warning("Not right shape. Expected List but got Single")
correct_shape = False
elif len(dtypesLst) == 1 and metadata["Output Shape"] != "Single":
print("Not right shape. Expected Single but got List")
elif len(dtypes_list) == 1 and metadata["Output Shape"] != "Single":
logging.warning("Not right shape. Expected Single but got List")
correct_shape = False
else:
print("Output is correct shape.")
correct_shape = True

print("Output has", count, "mismatched types.\n")
logging.info("Output has", count, "mismatched types.\n")

return {"mismatched_types": count, "correct_shape": correct_shape}

def get_peak_memory(self):
"""
Calculates the peak memory usage of ersilia's Python instance during the run.
:return: The peak memory usage in bytes.
"""

# Compare memory between peak and amount when we started
peak_memory = tracemalloc.get_traced_memory()[1] - self.memory_usage_start
tracemalloc.stop()
Expand All @@ -195,8 +353,8 @@ def track(self, input, result, meta):
Tracks the results after a model run.
"""
json_dict = {}
input_dataframe = read_csv(input)
result_dataframe = read_csv(result)
input_dataframe = pd.read_csv(input)
result_dataframe = pd.read_csv(result)

json_dict["input_dataframe"] = input_dataframe.to_dict()
json_dict["result_dataframe"] = result_dataframe.to_dict()
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ sphinx = {version = ">=5.3.0", optional = true} # For compatibility with python
jinja2 = {version = "^3.1.2", optional = true}
levenshtein = {version = ">=0.21.1,<0.23.0", optional = true} # For faster fuzzy search
boto3 = "^1.28.40"
requests = "^2.31.0"


[tool.poetry.extras]
Expand Down
Loading