Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a basic example workflow. #16

Merged
merged 21 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,4 @@ repos:
- id: nbqa-isort
args: ["--profile=black"]
- id: nbqa-flake8
args:
[
"--extend-ignore=E203,E402,E501,F401,F841",
"--exclude=logs/*,data/*",
]
args: ["--extend-ignore=E203,E402,E501,F401,F841", "--exclude=logs/*,data/*"]
121 changes: 120 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,129 @@ This repository contains the dataset, task, model training recipes, and results
effort for EHR machine learning.

Note that this repository is _not_ a place where functional code is stored. Rather, this repository stores
configuration files, training recipes, results, etc. for the MEDS-DEV benchmarking effort -- runnable code will
configuration files, training recipes, results, etc. for the MEDS-DEV benchmarking effort -- runnable code
will
often come from other repositories, with suitable permalinks being present in the various configuration files
or commit messages for associated contributions to this repository.

## Example workflow

### (Optional) Set up the MEDS project with environment

```bash
# Create and enter a MEDS project directory
mkdir $MY_MEDS_PROJECT_ROOT
cd $MY_MEDS_PROJECT_ROOT

conda create -n $MY_MEDS_CONDA_ENV python=3.10
conda activate $MY_MEDS_CONDA_ENV
```

Additionally install any model-related dependencies.

### Install MEDS-DEV

Clone the MEDS-DEV GitHub repo and install it locally.
This will additionally install some MEDS data processing dependencies:

```bash
git clone https://github.com/mmcdermott/MEDS-DEV.git
cd ./MEDS-DEV
pip install -e .
```

Install the MEDS evaluation package:

```bash
git clone https://github.com/kamilest/meds-evaluation.git
pip install -e ./meds-evaluation
```

Additionally, make sure any model-related dependencies are installed.

### Extract a task from the MEDS dataset

This step prepares the MEDS dataset for a task by extracting a cohort using inclusion/exclusion criteria and
processing the data to create the label files.

### Find the task configuration file

Task-related information is stored in Hydra configuration files (in `.yaml` format) under
`MEDS-DEV/src/MEDS_DEV/tasks/criteria`.

Task names are defined in a way that corresponds to the path to their configuration,
starting from the `MEDS-DEV/src/MEDS_DEV/tasks/criteria` directory.
For example,
`MEDS-DEV/src/MEDS_DEV/tasks/criteria/mortality/in_icu/first_24h.yaml` directory corresponds to a `$TASK_NAME`
of
`mortality/in_icu/first_24h`.

**To add a task**

If your task is not supported, you will need to add a directory and define an appropriate configuration file
in
a corresponding location.

### Dataset configuration file

Task configuration files are incomplete, because some concepts (predicates) have to be defined in a
dataset-specific
way (e.g. `icu_admission` in `mortality/in_icu/first_24h`).

These dataset-specific predicate definitions are found in
`MEDS-DEV/src/MEDS_DEV/datasets/$DATASET_NAME/predicates.yaml` Hydra configuration files.

In addition to `$DATASET_NAME` (e.g. `MIMIC-IV`), you will also need to have your MEDS dataset directory
ready (i.e.
`$MEDS_ROOT_DIR`).

**To add a dataset configuration file**

If your dataset is not supported, you will need to add a directory and define an appropriate configuration
file in
a corresponding location.

### Run the MEDS task extraction helper

From your project directory (`$MY_MEDS_PROJECT_ROOT`) where `MEDS-DEV` is located, run

```bash
./MEDS-DEV/src/MEDS_DEV/helpers/extract_task.sh $MEDS_ROOT_DIR $DATASET_NAME $TASK_NAME
```

This will use information from task and dataset-specific predicate configs to extract cohorts and labels from
`$MEDS_ROOT_DIR/data`, and place them in `$MEDS_ROOT_DIR/task_labels/$TASK_NAME/` subdirectories, retaining
the same
sharded structure as the `$MEDS_ROOT_DIR/data` directory.

### Train the model

This step depends on the API of your particular model.

For example, the command below will call a helper script that will generate random outputs for binary
classification,
conforming to MEDS binary classification prediction schema:

```bash
./MEDS-DEV/src/MEDS_DEV/helpers/generate_predictions.sh $MEDS_ROOT_DIR $TASK_NAME
```

### Evaluate the model

You can use the `meds-evaluation` package by running `meds-evaluation-cli` and providing the path to
predictions
dataframe as well as the output directory. For example,

```bash
meds-evaluation-cli \
predictions_path="./<$MEDS_ROOT_DIR>/task_predictions/$TASK_NAME/<train|tuning|held_out>/*.parquet" \
output_dir="./<$MEDS_ROOT_DIR>/task_evaluation/$TASK_NAME/<train|tuning|held_out>/..."
```

This will create a JSON file with the results in the directory provided by the `output_dir` argument.

Note this package currently supports binary classification only.

## Contributing to MEDS-DEV

### To Add a Model
Expand Down
6 changes: 6 additions & 0 deletions src/MEDS_DEV/configs/predictions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defaults:
- _ACES_MD
- _self_
- override hydra/hydra_logging: disabled

cohort_predictions_dir: "${oc.env:MEDS_ROOT_DIR}/task_predictions"
16 changes: 16 additions & 0 deletions src/MEDS_DEV/helpers/generate_predictions.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

export MEDS_ROOT_DIR=$1
export MEDS_DATASET_NAME=$2
export MEDS_TASK_NAME=$3

shift 3

MEDS_DEV_REPO_DIR=$(python -c "from importlib.resources import files; print(files(\"MEDS_DEV\"))")
export MEDS_DEV_REPO_DIR

# TODO improve efficiency of prediction generator by using this
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this TODO for? I'm not sure I understand.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment I'm reimplementing recursive search through the directory to find shards for which to generate the predictions, but I think this could be improved with the expand_shards helper you have implemented.

# SHARDS=$(expand_shards "$MEDS_ROOT_DIR"/data)
mmcdermott marked this conversation as resolved.
Show resolved Hide resolved

python -m MEDS_DEV.helpers.generate_random_predictions --config-path="$MEDS_DEV_REPO_DIR"/configs \
--config-name="predictions" "hydra.searchpath=[pkg://aces.configs]" "$@"
68 changes: 68 additions & 0 deletions src/MEDS_DEV/helpers/generate_random_predictions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import os
from importlib.resources import files
from pathlib import Path

import hydra
import numpy as np
import polars as pl
from omegaconf import DictConfig

SUBJECT_ID = "subject_id"
PREDICTION_TIME = "prediction_time"

BOOLEAN_VALUE_COLUMN = "boolean_value"
PREDICTED_BOOLEAN_VALUE_COLUMN = "predicted_boolean_value"
PREDICTED_BOOLEAN_PROBABILITY_COLUMN = "predicted_boolean_probability"

CONFIG = files("MEDS_DEV").joinpath("configs/predictions.yaml")


@hydra.main(version_base=None, config_path=str(CONFIG.parent.resolve()), config_name=CONFIG.stem)
def generate_random_predictions(cfg: DictConfig) -> None:
cohort_dir = cfg.cohort_dir # cohort_dir: "${oc.env:MEDS_ROOT_DIR}/task_labels"
cohort_name = cfg.cohort_name # cohort_name: ${task_name}; task_name: ${oc.env:MEDS_TASK_NAME}

cohort_dir = Path(cohort_dir) / cohort_name
cohort_predictions_dir = (
cfg.cohort_predictions_dir
) # cohort_predictions_dir: "${oc.env:MEDS_ROOT_DIR}/task_predictions"

# TODO: use expand_shards helper from the script to access sharded dataframes directly
for split in cohort_dir.iterdir():
if split.is_dir() and split.name in {"train", "tuning", "held_out"}: # train | tuning | held_out
for file in split.iterdir():
if file.is_file():
dataframe = pl.read_parquet(file)
predictions = _generate_random_predictions(dataframe) # sharded dataframes
Comment on lines +35 to +36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for file I/O operations.

Reading parquet files without error handling may cause the script to crash if a file is missing or corrupted. Incorporate try-except blocks around file I/O operations to handle exceptions gracefully and provide informative error messages.

Apply this diff to add error handling:

 for split in cohort_dir.iterdir():
     if split.is_dir() and split.name in {"train", "tuning", "held_out"}:
         for file in split.iterdir():
             if file.is_file():
-                dataframe = pl.read_parquet(file)
-                predictions = _generate_random_predictions(dataframe)
+                try:
+                    dataframe = pl.read_parquet(file)
+                    predictions = _generate_random_predictions(dataframe)
+                except Exception as e:
+                    print(f"Error processing {file}: {e}")
+                    continue
             # Rest of the code...

 elif split.is_file():
-    dataframe = pl.read_parquet(split)
-    predictions = _generate_random_predictions(dataframe)
+    try:
+        dataframe = pl.read_parquet(split)
+        predictions = _generate_random_predictions(dataframe)
+    except Exception as e:
+        print(f"Error processing {split}: {e}")
+        return

Also applies to: 44-45


# $MEDS_ROOT_DIR/task_predictions/$TASK_NAME/<split>/<file>.parquet
predictions_path = Path(cohort_predictions_dir) / cohort_name / split.name
os.makedirs(predictions_path, exist_ok=True)

predictions.write_parquet(predictions_path / file.name)
elif split.is_file():
dataframe = pl.read_parquet(split)
predictions = _generate_random_predictions(dataframe)

predictions_path = Path(cohort_predictions_dir) / cohort_name
os.makedirs(predictions_path, exist_ok=True)

predictions.write_parquet(predictions_path / split.name)

Comment on lines +31 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor duplicated code to improve maintainability.

The code segments handling directory and file inputs contain duplicated logic, particularly in reading dataframes and writing predictions. Refactoring this section by extracting common functionality into helper functions will reduce code duplication and enhance readability.

Apply this refactor to consolidate duplicated code:

 def generate_random_predictions(cfg: DictConfig) -> None:
     # Existing code above...
 
+    def process_file(file, predictions_path):
+        dataframe = pl.read_parquet(file)
+        predictions = _generate_random_predictions(dataframe)
+        os.makedirs(predictions_path, exist_ok=True)
+        predictions.write_parquet(predictions_path / file.name)
+
     for split in cohort_dir.iterdir():
         if split.is_dir() and split.name in {"train", "tuning", "held_out"}:
             for file in split.iterdir():
                 if file.is_file():
-                    dataframe = pl.read_parquet(file)
-                    predictions = _generate_random_predictions(dataframe)
-
-                    predictions_path = Path(cohort_predictions_dir) / cohort_name / split.name
-                    os.makedirs(predictions_path, exist_ok=True)
-
-                    predictions.write_parquet(predictions_path / file.name)
+                    predictions_path = Path(cohort_predictions_dir) / cohort_name / split.name
+                    process_file(file, predictions_path)
         elif split.is_file():
-            dataframe = pl.read_parquet(split)
-            predictions = _generate_random_predictions(dataframe)
-
             predictions_path = Path(cohort_predictions_dir) / cohort_name
-            os.makedirs(predictions_path, exist_ok=True)
-
-            predictions.write_parquet(predictions_path / split.name)
+            process_file(split, predictions_path)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for split in cohort_dir.iterdir():
if split.is_dir() and split.name in {"train", "tuning", "held_out"}: # train | tuning | held_out
for file in split.iterdir():
if file.is_file():
dataframe = pl.read_parquet(file)
predictions = _generate_random_predictions(dataframe) # sharded dataframes
# $MEDS_ROOT_DIR/task_predictions/$TASK_NAME/<split>/<file>.parquet
predictions_path = Path(cohort_predictions_dir) / cohort_name / split.name
os.makedirs(predictions_path, exist_ok=True)
predictions.write_parquet(predictions_path / file.name)
elif split.is_file():
dataframe = pl.read_parquet(split)
predictions = _generate_random_predictions(dataframe)
predictions_path = Path(cohort_predictions_dir) / cohort_name
os.makedirs(predictions_path, exist_ok=True)
predictions.write_parquet(predictions_path / split.name)
def process_file(file, predictions_path):
dataframe = pl.read_parquet(file)
predictions = _generate_random_predictions(dataframe)
os.makedirs(predictions_path, exist_ok=True)
predictions.write_parquet(predictions_path / file.name)
for split in cohort_dir.iterdir():
if split.is_dir() and split.name in {"train", "tuning", "held_out"}: # train | tuning | held_out
for file in split.iterdir():
if file.is_file():
predictions_path = Path(cohort_predictions_dir) / cohort_name / split.name
process_file(file, predictions_path)
elif split.is_file():
predictions_path = Path(cohort_predictions_dir) / cohort_name
process_file(split, predictions_path)


def _generate_random_predictions(dataframe: pl.DataFrame) -> pl.DataFrame:
"""Creates a new dataframe with the same subject_id and boolean_value columns as in the input dataframe,
along with predictions."""

output = dataframe.select([SUBJECT_ID, PREDICTION_TIME, BOOLEAN_VALUE_COLUMN])
probabilities = np.random.uniform(0, 1, len(dataframe))
# TODO: meds-evaluation currently cares about the order of columns and types, so the new columns have to
# be inserted at the correct position and cast to the correct type
output.insert_column(3, pl.Series(PREDICTED_BOOLEAN_VALUE_COLUMN, probabilities.round()).cast(pl.Boolean))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Generate boolean predictions using a threshold comparison.

Rounding uniform probabilities may not produce an unbiased random boolean outcome. Using a threshold comparison ensures a fair distribution of True and False values.

Apply this diff to update the boolean value generation:

-    output.insert_column(3, pl.Series(PREDICTED_BOOLEAN_VALUE_COLUMN, probabilities.round()).cast(pl.Boolean))
+    predicted_values = probabilities > 0.5
+    output.insert_column(3, pl.Series(PREDICTED_BOOLEAN_VALUE_COLUMN, predicted_values))

Alternatively, use rng.choice for direct boolean generation:

-    probabilities = rng.uniform(0, 1, len(dataframe))
+    predicted_values = rng.choice([True, False], size=len(dataframe))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
output.insert_column(3, pl.Series(PREDICTED_BOOLEAN_VALUE_COLUMN, probabilities.round()).cast(pl.Boolean))
predicted_values = probabilities > 0.5
output.insert_column(3, pl.Series(PREDICTED_BOOLEAN_VALUE_COLUMN, predicted_values))

output.insert_column(4, pl.Series(PREDICTED_BOOLEAN_PROBABILITY_COLUMN, probabilities))
mmcdermott marked this conversation as resolved.
Show resolved Hide resolved

return output


if __name__ == "__main__":
generate_random_predictions()
Loading