Skip to content

Commit

Permalink
Merge branch 'main' into table-viz
Browse files Browse the repository at this point in the history
  • Loading branch information
svittoz committed Jun 7, 2024
2 parents 30861df + da3ed10 commit 84d5e9e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 2 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
[![PyPI](https://img.shields.io/pypi/v/eds-scikit?color=blue&style=flat-square)](https://pypi.org/project/eds-scikit/)
[![Supported Python](https://img.shields.io/badge/python-%3E%3D%203.7.1%20%7C%20%3C%203.8-brightgreen?style=flat-square)](https://www.python.org/)
[![Black](https://img.shields.io/badge/code%20style-black-black?style=flat-square)]([https://www.python.org/](https://github.com/psf/black))
[![Coverage](https://codecov.io/github/aphp/eds-scikit/coverage.svg)](https://raw.githubusercontent.com/aphp/eds-scikit/coverage/coverage.txt)
[![Coverage](https://raw.githubusercontent.com/aphp/eds-scikit/coverage/coverage.svg)](https://raw.githubusercontent.com/aphp/eds-scikit/coverage/coverage.txt)
[![DOI](https://zenodo.org/badge/571584236.svg)](https://zenodo.org/badge/latestdoi/571584236&style=flat-square)

</p>
Expand Down
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
### Added
- omop teva module

### Fixed
- Pyarrow fix now work on spark executors.
- Fix OMOP _date columns issue

## v0.1.7 (2024-04-12)
### Changed
- Support for pyarrow > 0.17.0
Expand Down
33 changes: 32 additions & 1 deletion eds_scikit/io/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@

import pandas as pd
import pyarrow.parquet as pq
import pyspark.sql.functions as F
import pyspark.sql.types as T
from databricks import koalas
from loguru import logger
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, StructField, StructType

from eds_scikit.utils.framework import cache

from . import settings
from .base import BaseData
from .data_quality import clean_dates
Expand All @@ -33,6 +37,8 @@ def __init__(
Union[Dict[str, Optional[List[str]]], List[str]]
] = None,
database_type: Optional[str] = "OMOP",
prune_omop_date_columns: bool = True,
cache: bool = True,
):
"""Spark interface for OMOP data stored in a Hive database.
Expand All @@ -54,6 +60,12 @@ def __init__(
*deprecated*
database_type: Optional[str] = 'OMOP'. Must be 'OMOP' or 'I2B2'
Whether to use the native OMOP schema or to convert I2B2 inputs to OMOP.
prune_omop_date_columns: bool, default=True
In OMOP, most date values are stored both in a `<str>_date` and `<str>_datetime` column
Koalas has trouble handling the `date` time, so we only keep the `datetime` column
cache: bool, default=True
Whether to cache each table after preprocessing or not.
Will speed-up subsequent calculations, but can be long/infeasable for very large tables
Attributes
----------
Expand Down Expand Up @@ -125,6 +137,8 @@ def __init__(
for omop_table, i2b2_table in self.omop_to_i2b2.items():
self.i2b2_to_omop[i2b2_table].append(omop_table)

self.prune_omop_date_columns = prune_omop_date_columns
self.cache = cache
self.user = os.environ["USER"]
self.person_ids, self.person_ids_df = self._prepare_person_ids(person_ids)
self.available_tables = self.list_available_tables()
Expand Down Expand Up @@ -224,10 +238,27 @@ def _read_table(
if "person_id" in df.columns and person_ids is not None:
df = df.join(person_ids, on="person_id", how="inner")

df = df.cache().to_koalas()
if self.prune_omop_date_columns:

# Keeping only _datetime column if corresponding _date exists
cols = [
c
for c in df.columns
if not ((c.endswith("_date") and (f"{c}time" in df.columns)))
]
df = df.select(cols)

# Casting the single _date columns to timestamp:
for col in df.schema:
if col.dataType == T.DateType():
df = df.withColumn(col.name, F.col(col.name).cast("timestamp"))
df = df.to_koalas()

df = clean_dates(df)

if self.cache:
df = cache(df)

return df

def persist_tables_to_folder(
Expand Down

0 comments on commit 84d5e9e

Please sign in to comment.