Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove OMOP <>_date columns #59

Merged
merged 1 commit into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
## Unreleased

### Fixed
- pyarrow fix did not work on spark executors
- Pyarrow fix now work on spark executors.
- Fix OMOP _date columns issue

## v0.1.7 (2024-04-12)
### Changed
Expand Down
33 changes: 32 additions & 1 deletion eds_scikit/io/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@

import pandas as pd
import pyarrow.parquet as pq
import pyspark.sql.functions as F
import pyspark.sql.types as T
from databricks import koalas
from loguru import logger
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, StructField, StructType

from eds_scikit.utils.framework import cache

from . import settings
from .base import BaseData
from .data_quality import clean_dates
Expand All @@ -33,6 +37,8 @@ def __init__(
Union[Dict[str, Optional[List[str]]], List[str]]
] = None,
database_type: Optional[str] = "OMOP",
prune_omop_date_columns: bool = True,
cache: bool = True,
):
"""Spark interface for OMOP data stored in a Hive database.

Expand All @@ -54,6 +60,12 @@ def __init__(
*deprecated*
database_type: Optional[str] = 'OMOP'. Must be 'OMOP' or 'I2B2'
Whether to use the native OMOP schema or to convert I2B2 inputs to OMOP.
prune_omop_date_columns: bool, default=True
In OMOP, most date values are stored both in a `<str>_date` and `<str>_datetime` column
Koalas has trouble handling the `date` time, so we only keep the `datetime` column
cache: bool, default=True
Whether to cache each table after preprocessing or not.
Will speed-up subsequent calculations, but can be long/infeasable for very large tables

Attributes
----------
Expand Down Expand Up @@ -125,6 +137,8 @@ def __init__(
for omop_table, i2b2_table in self.omop_to_i2b2.items():
self.i2b2_to_omop[i2b2_table].append(omop_table)

self.prune_omop_date_columns = prune_omop_date_columns
self.cache = cache
self.user = os.environ["USER"]
self.person_ids, self.person_ids_df = self._prepare_person_ids(person_ids)
self.available_tables = self.list_available_tables()
Expand Down Expand Up @@ -224,10 +238,27 @@ def _read_table(
if "person_id" in df.columns and person_ids is not None:
df = df.join(person_ids, on="person_id", how="inner")

df = df.cache().to_koalas()
if self.prune_omop_date_columns:

# Keeping only _datetime column if corresponding _date exists
cols = [
c
for c in df.columns
if not ((c.endswith("_date") and (f"{c}time" in df.columns)))
]
df = df.select(cols)

# Casting the single _date columns to timestamp:
for col in df.schema:
if col.dataType == T.DateType():
df = df.withColumn(col.name, F.col(col.name).cast("timestamp"))
df = df.to_koalas()

df = clean_dates(df)

if self.cache:
df = cache(df)

return df

def persist_tables_to_folder(
Expand Down
Loading