From 09b7f50f70f8b0634a5a686af46217b13312dcb9 Mon Sep 17 00:00:00 2001 From: svittoz <137794505+svittoz@users.noreply.github.com> Date: Tue, 28 May 2024 13:57:58 +0200 Subject: [PATCH 1/3] pyarrow fix (#65) pyarrow path fix for executor --- changelog.md | 3 +++ eds_scikit/__init__.py | 14 +++---------- eds_scikit/io/__init__.py | 8 ++++++- eds_scikit/io/improve_performance.py | 31 ++++++++++++++++++++++++++++ 4 files changed, 44 insertions(+), 12 deletions(-) diff --git a/changelog.md b/changelog.md index e1534b5b..613ad5b3 100644 --- a/changelog.md +++ b/changelog.md @@ -2,6 +2,9 @@ ## Unreleased +### Fixed +- pyarrow fix did not work on spark executors + ## v0.1.7 (2024-04-12) ### Changed - Support for pyarrow > 0.17.0 diff --git a/eds_scikit/__init__.py b/eds_scikit/__init__.py index 6da07cb7..b7b168af 100644 --- a/eds_scikit/__init__.py +++ b/eds_scikit/__init__.py @@ -12,33 +12,25 @@ import importlib import os import pathlib -import sys import time from packaging import version from typing import List, Tuple from pathlib import Path import pandas as pd -import pyarrow -import pyarrow.ipc import pyspark from loguru import logger from pyspark import SparkContext from pyspark.sql import SparkSession import eds_scikit.biology # noqa: F401 --> To register functions -from eds_scikit.io import improve_performances - -pyarrow.open_stream = pyarrow.ipc.open_stream - -sys.path.insert( - 0, (pathlib.Path(__file__).parent / "package-override").absolute().as_posix() -) -os.environ["PYTHONPATH"] = ":".join(sys.path) +from eds_scikit.io import improve_performances, pyarrow_fix # Remove SettingWithCopyWarning pd.options.mode.chained_assignment = None +pyarrow_fix() + logger.warning( """To improve performances when using Spark and Koalas, please call `eds_scikit.improve_performances()` This function optimally configures Spark. Use it as: diff --git a/eds_scikit/io/__init__.py b/eds_scikit/io/__init__.py index bc01ea06..7fb4e35e 100644 --- a/eds_scikit/io/__init__.py +++ b/eds_scikit/io/__init__.py @@ -2,7 +2,13 @@ from .files import PandasData from .hive import HiveData from .postgres import PostgresData -from .improve_performance import improve_performances, koalas_options, load_koalas +from .improve_performance import ( + improve_performances, + koalas_options, + load_koalas, + pyarrow_fix, +) + __all__ = [ "BaseData", diff --git a/eds_scikit/io/improve_performance.py b/eds_scikit/io/improve_performance.py index 42b6f19a..afed9f27 100644 --- a/eds_scikit/io/improve_performance.py +++ b/eds_scikit/io/improve_performance.py @@ -6,6 +6,7 @@ from typing import List, Tuple import pyarrow +import pyarrow.ipc import pyspark from packaging import version from pyspark import SparkContext @@ -50,6 +51,36 @@ def set_env_variables() -> None: os.environ["PYARROW_IGNORE_TIMEZONE"] = "0" +def pyarrow_fix(): + """ + Fixing error 'pyarrow has no attributes open_stream' due to PySpark 2 incompatibility with pyarrow > 0.17 + """ + + # Setting path to our patched pyarrow module + pyarrow.open_stream = pyarrow.ipc.open_stream + + sys.path.insert( + 0, (Path(__file__).parent / "package-override").absolute().as_posix() + ) + os.environ["PYTHONPATH"] = ":".join(sys.path) + + # Setting this path for Pyspark executors + global spark, sc, sql + + spark = SparkSession.builder.getOrCreate() + + conf = spark.sparkContext.getConf() + conf.set( + "spark.executorEnv.PYTHONPATH", + f"{Path(__file__).parent.parent}/package-override:{conf.get('spark.executorEnv.PYTHONPATH')}", + ) + spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate() + + sc = spark.sparkContext + + sql = spark.sql + + def improve_performances( to_add_conf: List[Tuple[str, str]] = [], quiet_spark: bool = True, From dd12804490f1e8c6208b2dace0407f960bc43c8e Mon Sep 17 00:00:00 2001 From: Thomas Petit-Jean <30775613+Thomzoy@users.noreply.github.com> Date: Thu, 30 May 2024 17:46:17 +0200 Subject: [PATCH 2/3] fix: cast single date columns to datetime (#59) --- changelog.md | 3 ++- eds_scikit/io/hive.py | 33 ++++++++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/changelog.md b/changelog.md index 613ad5b3..03b591c1 100644 --- a/changelog.md +++ b/changelog.md @@ -3,7 +3,8 @@ ## Unreleased ### Fixed -- pyarrow fix did not work on spark executors +- Pyarrow fix now work on spark executors. +- Fix OMOP _date columns issue ## v0.1.7 (2024-04-12) ### Changed diff --git a/eds_scikit/io/hive.py b/eds_scikit/io/hive.py index f04dd378..54c42106 100644 --- a/eds_scikit/io/hive.py +++ b/eds_scikit/io/hive.py @@ -6,12 +6,16 @@ import pandas as pd import pyarrow.parquet as pq +import pyspark.sql.functions as F +import pyspark.sql.types as T from databricks import koalas from loguru import logger from pyspark.sql import DataFrame as SparkDataFrame from pyspark.sql import SparkSession from pyspark.sql.types import LongType, StructField, StructType +from eds_scikit.utils.framework import cache + from . import settings from .base import BaseData from .data_quality import clean_dates @@ -33,6 +37,8 @@ def __init__( Union[Dict[str, Optional[List[str]]], List[str]] ] = None, database_type: Optional[str] = "OMOP", + prune_omop_date_columns: bool = True, + cache: bool = True, ): """Spark interface for OMOP data stored in a Hive database. @@ -54,6 +60,12 @@ def __init__( *deprecated* database_type: Optional[str] = 'OMOP'. Must be 'OMOP' or 'I2B2' Whether to use the native OMOP schema or to convert I2B2 inputs to OMOP. + prune_omop_date_columns: bool, default=True + In OMOP, most date values are stored both in a `_date` and `_datetime` column + Koalas has trouble handling the `date` time, so we only keep the `datetime` column + cache: bool, default=True + Whether to cache each table after preprocessing or not. + Will speed-up subsequent calculations, but can be long/infeasable for very large tables Attributes ---------- @@ -125,6 +137,8 @@ def __init__( for omop_table, i2b2_table in self.omop_to_i2b2.items(): self.i2b2_to_omop[i2b2_table].append(omop_table) + self.prune_omop_date_columns = prune_omop_date_columns + self.cache = cache self.user = os.environ["USER"] self.person_ids, self.person_ids_df = self._prepare_person_ids(person_ids) self.available_tables = self.list_available_tables() @@ -224,10 +238,27 @@ def _read_table( if "person_id" in df.columns and person_ids is not None: df = df.join(person_ids, on="person_id", how="inner") - df = df.cache().to_koalas() + if self.prune_omop_date_columns: + + # Keeping only _datetime column if corresponding _date exists + cols = [ + c + for c in df.columns + if not ((c.endswith("_date") and (f"{c}time" in df.columns))) + ] + df = df.select(cols) + + # Casting the single _date columns to timestamp: + for col in df.schema: + if col.dataType == T.DateType(): + df = df.withColumn(col.name, F.col(col.name).cast("timestamp")) + df = df.to_koalas() df = clean_dates(df) + if self.cache: + df = cache(df) + return df def persist_tables_to_folder( From da3ed10ff7ff35bd994e83f229f7d1e19d308d1f Mon Sep 17 00:00:00 2001 From: svittoz <137794505+svittoz@users.noreply.github.com> Date: Fri, 31 May 2024 11:40:08 +0200 Subject: [PATCH 3/3] fix coverage badge (#67) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 13f79df7..2ddc938e 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ [![PyPI](https://img.shields.io/pypi/v/eds-scikit?color=blue&style=flat-square)](https://pypi.org/project/eds-scikit/) [![Supported Python](https://img.shields.io/badge/python-%3E%3D%203.7.1%20%7C%20%3C%203.8-brightgreen?style=flat-square)](https://www.python.org/) [![Black](https://img.shields.io/badge/code%20style-black-black?style=flat-square)]([https://www.python.org/](https://github.com/psf/black)) -[![Coverage](https://codecov.io/github/aphp/eds-scikit/coverage.svg)](https://raw.githubusercontent.com/aphp/eds-scikit/coverage/coverage.txt) +[![Coverage](https://raw.githubusercontent.com/aphp/eds-scikit/coverage/coverage.svg)](https://raw.githubusercontent.com/aphp/eds-scikit/coverage/coverage.txt) [![DOI](https://zenodo.org/badge/571584236.svg)](https://zenodo.org/badge/latestdoi/571584236&style=flat-square)