Skip to content

Excerpt of Produced Datasets

Nina Ihde edited this page Sep 23, 2021 · 4 revisions

Most data files are preprocessed and stored in Parquet format. The Parquet format offers compressed, efficient columnar data representation (learn more about Parquet).

To use them, simply read them in as Pandas DataFrame. For large data sets, it is recommended to read them in as Dask DataFrame, as this allows parallel computation. A Dask DataFrame is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along the index (learn more about Dask).

Note: A peculiarity of saving Pandas DataFrames as Parquet is that "parquet must have string column names". If dataframes have e.g. integer column names, these must be converted to strings before saving. So don't be surprised if you find string column names in the data frames read from Parquet (it's probably rather rare to have non-string column names anyway).

You can read in Parquet files as Pandas or Dask DataFrame as follows:

import pandas as pd
import dask.dataframe as dd
import pyarrow as pa

# Specify the file path valid for your machine including the file name, e.g. './data/chartevents_subset.parquet'
filepath = './insert/your/path/filename.parquet'

# To read Parquet file as Dask DataFrame
ddf = dd.read_parquet(filepath, engine='pyarrow')

# To read Parquet file as Pandas DataFrame
pdf = pd.read_parquet(filepath, engine='pyarrow')

chartevents_subset.parquet

  • 9175037 rows × 15 columns, 109 MB
  • Subset of the MIMIC-III data set called CHARTEVENTS.csv (see also respective MIMIC schema website)
  • No change in columns, all have been kept.
  • The number of rows has been significantly reduced by filtering for specific ITEMIDs and removing rows without ICUSTAY_ID:
      import pandas as pd
      import dask.dataframe as dd
      import pyarrow as pa
      from dask.diagnostics import ProgressBar
      
      # Read CHARTEVENTS.csv as Dask DataFrame
      # Data types based on MIMIC schema specification https://mit-lcp.github.io/mimic-schema-spy/tables/chartevents.html
      # Problem: Complicated use of intger data types with NaNs in Pandas, see https://pandas.pydata.org/pandas-docs/stable/user_guide/gotchas.html#nan-integer-na-values-and-na-type-promotions
      # Decision: Floats and integers are read in as 'float64', strings as 'object', and timestamps via Dask's parse_dates provided for this purpose.
      chartevents = dd.read_csv('../mimic/CHARTEVENTS.csv', parse_dates=['CHARTTIME','STORETIME'], dtype={
          'ROW_ID': 'float64', # int4 according to specification
          'SUBJECT_ID': 'float64', # int4 according to specification
          'HADM_ID': 'float64', # int4 according to specification
          'ICUSTAY_ID': 'float64', # int4 according to specification
          'ITEMID': 'float64', # int4 according to specification
          'CGID': 'float64', # int4 according to specification
          'VALUE': 'object',
          'VALUENUM': 'float64', # float8 according to specification
          'VALUEUOM': 'object',
          'WARNING': 'float64', # int4 according to specification
          'ERROR': 'float64', # int4 according to specification
          'RESULTSTATUS': 'object',
          'STOPPED': 'object'})
      
      # Create set of relevant ITEMIDs to filter by
      itemid_filter = [220045, 220046, 220047, 220179, 223751, 223752, 220180, 220277, 223769, 223770]
      # 220045 Heart Rate
      # 220046 Heart rate Alarm - High
      # 220047 Heart Rate Alarm - Low
      # 220179 Non Invasive Blood Pressure systolic
      # 223751 Non-Invasive Blood Pressure Alarm - High
      # 223752 Non-Invasive Blood Pressure Alarm - Low
      # 220180 Non Invasive Blood Pressure diastolic
      # 220277 O2 saturation pulseoxymetry
      # 223769 O2 Saturation Pulseoxymetry Alarm - High
      # 223770 O2 Saturation Pulseoxymetry Alarm - Low
      
      with ProgressBar():
          # Filter by ITEMIDs
          chartevents_subset = chartevents[chartevents.ITEMID.isin(itemid_filter)]
          # Drop rows without ICUSTAY_ID (The ICUSTAY_ID is missing in 1811 rows, so these are removed.)
          chartevents_subset = chartevents_subset.dropna(how='any', subset=['ICUSTAY_ID'])
          # Keep only the rows for which no error occurred, which is coded by a 0. (5584 rows are dropped because the boolean ERROR column equals 1, indicating an error.)
          chartevents_subset = chartevents_subset[chartevents_subset.ERROR.isin([0])]
          # Apply the previously defined commands to the Dask DataFrame, resulting in the desired Pandas DataFrame.
          chartevents_subset = chartevents_subset.compute()
          # Computing duration on Marius' laptop (Intel i5-5200U CPU @ 2.20GHz): 21min
      
      # Sort the rows (not essential, but gives a better overview)
      chartevents_subset = chartevents_subset.sort_values(by=['ICUSTAY_ID', 'CHARTTIME','ITEMID'])
      
      # Rest index
      chartevents_subset = chartevents_subset.reset_index(drop=True)
      
      # Save as parquet file
      pd.DataFrame(chartevents_subset).to_parquet('../data/chartevents_subset.parquet', engine='pyarrow')

unique_icustays_in_chartevents_subset.parquet

  • 23446 rows × 1 columns, 138 KB
  • This DataFrame contains only the ICUSTAY_ID column, which contains all unique ICUSTAY_IDs contained in chartevents_subset.parquet.
      import pandas as pd
      import pyarrow as pa
      
      # Read chartevents_subset from parquet file to pandas data frame
      chartevents_subset = pd.read_parquet('../data/chartevents_subset.parquet', engine='pyarrow')
      
      # Compute unqiue ICU stays in chartevents_subset 
      unique_icustays_in_chartevents_subset = pd.Series(chartevents_subset.ICUSTAY_ID.unique()).rename('ICUSTAY_ID')
      
      # Save as parquet file (To do this, the Pandas Series must be converted to a Pandas DataFrame.)
      pd.DataFrame(unique_icustays_in_chartevents_subset).to_parquet('../data/unique_icustays_in_chartevents_subset.parquet', engine='pyarrow')

unique_icustays_in_chartevents.parquet

  • 60841 rows × 1 columns, 410 KB
  • Unique ICU stays in MIMIC-III data set called CHARTEVENTS.csv (see also respective MIMIC schema website)
  • Not available on server yet
  • Only column 'ICUSTAY_ID' has been kept, on which uniqueness has been applied.

prescriptions_based_medications.parquet

  • 1287829 rows × 10 columns, 9 MB
  • Subset of the MIMIC-III data set called PRESCRIPTIONS.csv containing the medication information for each relevant ICUSTAY_ID with its date period, drug type, drug name and dosis
  • Relevant ICUSTAY_IDs are the ones that can be found in unique_icustays_in_chartevents_subset.parquet
  • Additionally contains integer column DATE_DIF with difference betweeen STARTDATE and ENDDATE in days (negative differences removed)
  • Freed from rows with NaN entries
      import numpy as np
      import pandas as pd
      import dask.dataframe as dd
      from dask.diagnostics import ProgressBar
    
      # Data types based on MIMIC schema specification https://mit-lcp.github.io/mimic-schema-spy/tables/prescriptions.html
      # Problem: Complicated use of integer data types with NaNs in Pandas, see https://pandas.pydata.org/pandas-docs/stable/user_guide/gotchas.html#nan-integer-na-values-and-na-type-promotions
      # Decision: Integers are read in as 'float64', strings as 'object', and timestamps via Dask's parse_dates provided for this purpose
      prescriptions = dd.read_csv('../data/mimic-iii-clinical-database-1.4/PRESCRIPTIONS.csv', parse_dates=['STARTDATE', 'ENDDATE'], dtype={
          'ROW_ID': 'float64', # int4 according to specification
          'SUBJECT_ID': 'float64', # int4 according to specification
          'HADM_ID': 'float64', # int4 according to specification
          'ICUSTAY_ID': 'float64', # int4 according to specification
          'DRUG_TYPE': 'object', # varchar according to specification
          'DRUG': 'object', # varchar according to specification
          'DRUG_NAME_POE': 'object', # varchar according to specification
          'DRUG_NAME_GENERIC': 'object', # varchar according to specification
          'FORMULARY_DRUG_CD': 'object', # varchar according to specification
          'GSN': 'object', # varchar according to specification
          'NDC': 'object', # varchar according to specification
          'PROD_STRENGTH': 'object', # varchar according to specification
          'DOSE_VAL_RX': 'object', # varchar according to specification
          'DOSE_UNIT_RX': 'object', # varchar according to specification
          'FORM_VAL_DISP': 'object', # varchar according to specification
          'FORM_UNIT_DISP': 'object', # varchar according to specification
          'ROUTE': 'object' # varchar according to specification
      })
    
      unique_ICU_stays = pd.read_parquet('../data/unique_icustays_in_chartevents_subset.parquet', engine='pyarrow')
    
      with ProgressBar():
          # Extract relevant columns (ICUSTAY_ID, date period, drug type, drug name and its dosis)
          medication_subset = prescriptions[['ICUSTAY_ID', 'STARTDATE', 'ENDDATE', 'DRUG_TYPE', 'DRUG', 'DOSE_VAL_RX', 'DOSE_UNIT_RX', 'FORM_VAL_DISP', 'FORM_UNIT_DISP']]
    
          # Filter by ICUSTAY_IDs
          medication_subset = medication_subset[medication_subset.ICUSTAY_ID.isin(unique_ICU_stays.ICUSTAY_ID)]
          # Convert ICUSTAY_ID to integer (aka remove ".0")
          medication_subset['ICUSTAY_ID'] = medication_subset['ICUSTAY_ID'].astype(int)
    
          # Drop 783 rows with ENDDATE = NaT
          medication_subset = medication_subset[medication_subset.ENDDATE.notnull()]
          # Drop 634 rows with FORM_UNIT_DISP = NaN (also removes rows with NAN entries in DOSE_VAL_RX, DOSE_UNIT_RX and FORM_VAL_DISP columns)
          medication_subset = medication_subset[medication_subset.FORM_UNIT_DISP.notnull()]
    
          # Apply the previously defined commands to the Dask DataFrame, resulting in the desired Pandas DataFrame
          medication_subset = medication_subset.compute()
    
      # Calculate difference betweeen STARTDATE and ENDDATE
      medication_subset['DATE_DIF'] = pd.to_datetime(medication_subset['ENDDATE']) - pd.to_datetime(medication_subset['STARTDATE'])
      # Extract integer values (aka remove " days")
      medication_subset['DATE_DIF'] = (medication_subset['DATE_DIF'] / np.timedelta64(1,'D')).astype(int)
      # Remove 5,546 negative date differences (STARTDATE after ENDDATE)
      medication_subset = medication_subset[medication_subset['DATE_DIF'] >= 0]
    
      # Sort rows for better overview
      medication_subset = medication_subset.sort_values(by=['ICUSTAY_ID', 'STARTDATE','ENDDATE'])
    
      # Reset index
      medication_subset = medication_subset.reset_index(drop=True)
    
      # Save as parquet file
      pd.DataFrame(medication_subset).to_parquet('../data/prescriptions_based_medications.parquet', engine='pyarrow')

inputevents_based_medications.parquet

  • 1282499 rows × 21 columns, 60 MB
  • Subset of the MIMIC-III data set called INPUTEVENTS_MV.csv
  • Filtered by ITEMIDs clearly related to medications and by ICUSTAY_IDs of interest from CHARTEVENTS.csv
  • Additionally contains column DURATION_IN_MIN with duration from STARTTIME to ENDTIME in minutes (negative durations removed)
      import dask.dataframe as dd
      from dask.diagnostics import ProgressBar
      import pandas as pd
    
      # Data types based on MIMIC schema specification https://mit-lcp.github.io/mimic-schema-spy/tables/inputevents_mv.html
      # Problem: Complicated use of integer data types with NaNs in Pandas, see https://pandas.pydata.org/pandas-docs/stable/user_guide/gotchas.html#nan-integer-na-values-and-na-type-promotions
      # Decision: Decision: Integers are read in as 'float64' and strings as 'object'
      d_items = dd.read_csv('../data/mimic-iii-clinical-database-1.4/D_ITEMS.csv', dtype={
          'ROW_ID': 'float64', # int according to specification
          'ITEMID': 'float64', # int according to specification
          'LABEL': 'object', # varchar according to specification
          'ABBREVIATION': 'object', # varchar according to specification
          'DBSOURCE': 'object', # varchar according to specification
          'LINKSTO': 'object', # varchar according to specification
          'CATEGORY': 'object', # varchar according to specification
          'UNITNAME': 'object', # varchar according to specification
          'PARAM_TYPE': 'object', # varchar according to specification
          'CONCEPTID': 'float64' # int according to specification
      })
    
      with ProgressBar():
          # Filter for ITEMIDs from INPUTEVENTS_MV
          medication_items = d_items[d_items['LINKSTO'] == 'inputevents_mv']
    
          # Filter by categories clearly related to medications (disregard "Fluids - Other (Not In Use)" because it also includes liquid and special nutrition)
          medication_items = medication_items[medication_items['CATEGORY'].isin(['Medications', 'Blood Products/Colloids', 'Antibiotics'])]
    
          # Apply the previously defined commands to the Dask DataFrame, resulting in the desired Pandas DataFrame
          medication_items = medication_items.compute()
    
    
      # Data types based on MIMIC schema specification https://mit-lcp.github.io/mimic-schema-spy/tables/inputevents_mv.html
      # Problem: Complicated use of integer data types with NaNs in Pandas, see https://pandas.pydata.org/pandas-docs/stable/user_guide/gotchas.html#nan-integer-na-values-and-na-type-promotions
      # Decision: Decision: Floats and integers are read in as 'float64', strings as 'object', and timestamps via Dask's parse_dates provided for this purpose
      inputevents = dd.read_csv('../data/mimic-iii-clinical-database-1.4/INPUTEVENTS_MV.csv', parse_dates=['STARTTIME', 'ENDTIME', 'STORETIME', 'COMMENTS_DATE'], dtype={
          'ROW_ID': 'float64', # int4 according to specification
          'SUBJECT_ID': 'float64', # int4 according to specification
          'HADM_ID': 'float64', # int4 according to specification
          'ICUSTAY_ID': 'float64', # int4 according to specification
          'ITEMID': 'float64', # int4 according to specification
          'AMOUNT': 'float64', # float8 according to specification
          'AMOUNTUOM': 'object', # varchar according to specification
          'RATE': 'float64', # float8 according to specification
          'RATEUOM': 'object', # varchar according to specification
          'CGID': 'float64', # int4 according to specification
          'ORDERID': 'float64', # int4 according to specification
          'LINKORDERID': 'float64', # int4 according to specification
          'ORDERCATEGORYNAME': 'object', # varchar according to specification
          'SECONDARYORDERCATEGORYNAME': 'object', # varchar according to specification
          'ORDERCOMPONENTTYPEDESCRIPTION': 'object', # varchar according to specification
          'ORDERCATEGORYDESCRIPTION': 'object', # varchar according to specification
          'PATIENTWEIGHT': 'float64', # float8 according to specification
          'TOTALAMOUNT': 'float64', # float8 according to specification
          'TOTALAMOUNTUOM': 'object', # varchar according to specification
          'ISOPENBAG': 'float64', # int2 according to specification
          'CONTINUEINNEXTDEPT': 'float64', # int2 according to specification
          'CANCELREASON': 'float64', # int2 according to specification
          'STATUSDESCRIPTION': 'object', # varchar according to specification
          'COMMENTS_STATUS': 'object', # varchar according to specification
          'COMMENTS_TITLE': 'object', # varchar according to specification
          'ORIGINALAMOUNT': 'float64', # float8 according to specification
          'ORIGINALRATE': 'float64' # float8 according to specification
      })
    
      # Get all relevant ICU stays
      unique_ICU_stays = pd.read_parquet('../data/unique_icustays_in_chartevents_subset.parquet', engine='pyarrow')
    
      with ProgressBar():
          # Extract relevant columns
          inputevents_subset = inputevents[['ICUSTAY_ID', 'STARTTIME', 'ENDTIME', 'ITEMID', 'AMOUNT', 'AMOUNTUOM', 'RATE', 'RATEUOM', 'STORETIME', 'ORDERID', 'LINKORDERID', 'ORDERCATEGORYNAME', 'SECONDARYORDERCATEGORYNAME', 'ORDERCOMPONENTTYPEDESCRIPTION', 'ORDERCATEGORYDESCRIPTION' , 'TOTALAMOUNT', 'TOTALAMOUNTUOM', 'STATUSDESCRIPTION', 'ORIGINALAMOUNT', 'ORIGINALRATE']]
    
          # Filter by ICUSTAY_IDs
          inputevents_subset = inputevents_subset[inputevents_subset.ICUSTAY_ID.isin(unique_ICU_stays.ICUSTAY_ID)]
    
          # Drop rows without ICUSTAY_ID
          inputevents_subset = inputevents_subset.dropna(how='any', subset=['ICUSTAY_ID'])
    
          # Reduce ITEMIDs to the ones whose categories are clearly related to medications
          inputevents_subset = inputevents_subset[inputevents_subset['ITEMID'].isin(medication_items.ITEMID.unique())]
    
          # Drop rows with TOTALAMOUNT = NaN because it should always be set (also removes all NaN values in TOTALAMOUNTUOM)
          inputevents_subset = inputevents_subset[inputevents_subset.TOTALAMOUNT.notnull()]
    
          # Apply the previously defined commands to the Dask DataFrame, resulting in the desired Pandas DataFrame
          inputevents_subset = inputevents_subset.compute()
    
      # Calculate difference betweeen STARTTIME and ENDTIME
      inputevents_subset['DURATION_IN_MIN'] = (pd.to_datetime(inputevents_subset['ENDTIME']) - pd.to_datetime(inputevents_subset['STARTTIME'])) / pd.Timedelta(minutes=1)
    
      # Remove negative durations (STARTTIME after ENDTIME)
      inputevents_subset = inputevents_subset[inputevents_subset['DURATION_IN_MIN'] >= 0]
    
      # Sort rows for better overview
      inputevents_subset = inputevents_subset.sort_values(by=['ICUSTAY_ID', 'STARTTIME','ENDTIME', 'ITEMID'])
    
      # Reset index
      inputevents_subset = inputevents_subset.reset_index(drop=True)
    
      # Save as parquet file
      pd.DataFrame(inputevents_subset).to_parquet('../data/inputevents_based_medications.parquet', engine='pyarrow')