Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Pyspark 3 #53

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,20 @@ jobs:
needs: check_skip
if: ${{ needs.check_skip.outputs.skip == 'false' }}
runs-on: "ubuntu-latest"
strategy:
fail-fast: true
matrix:
include:
- python-version: "3.7"
spark: "spark2"
- python-version: "3.7"
spark: "spark3"
- python-version: "3.8"
spark: "spark3"
- python-version: "3.9"
spark: "spark3"
- python-version: "3.10"
spark: "spark3"
name: 'Testing on ubuntu'
defaults:
run:
Expand All @@ -61,13 +75,15 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.7'
- name: Install eds-scikit
shell: bash {0}
run: ./build_tools/github/install.sh
- name: Run tests
shell: bash {0}
run: ./build_tools/github/test.sh
python-version: ${{ matrix.python-version }}
cache: 'pip'
- name: Install dependencies
run: |
pip install -U pip wheel
pip install --progress-bar off ".[${{ matrix.spark }}, dev, doc]"
- name: Run pytest
run: |
python -m pytest --pyargs tests -m "" --cov=eds_scikit
- name: Upload coverage to CodeCov
uses: codecov/codecov-action@v3
if: success()
Expand Down
4 changes: 0 additions & 4 deletions build_tools/github/install.sh

This file was deleted.

4 changes: 0 additions & 4 deletions build_tools/github/test.sh

This file was deleted.

8 changes: 8 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
# Changelog

## Unreleased

### Added

- Support for pyarrow > 0.17.0
- Support for Python 3.7 to 3.10 (3.11 or higher is not tested)
- Support for pyspark 3 (to force pyspark 2, use `pip install eds-scikit[spark2]`)

### Fixed
- Caching in spark instead of koalas to improve speed

## v0.1.6 (2023-09-27)

### Added
- Module ``event_sequences`` to visualize individual sequences of events.
- Module ``age_pyramid`` to quickly visualize the age and gender distributions in a cohort.
Expand Down
2 changes: 1 addition & 1 deletion docs/generate_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

for path in sorted(Path("eds_scikit").rglob("*.py")):
print(path)
if ".ipynb_checkpoints" in path.parts:
if ".ipynb_checkpoints" in path.parts or "package-override" in path.parts:
continue
module_path = path.relative_to(".").with_suffix("")
doc_path = path.relative_to("eds_scikit").with_suffix(".md")
Expand Down
1 change: 1 addition & 0 deletions docs/project_description.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ The goal of **Koalas** is precisely to avoid this issue. It aims at allowing cod

```python
from databricks import koalas as ks
# or from pyspark import pandas as ks, if you have spark 3

# Converting the Spark DataFrame into a Koalas DataFrame
visit_occurrence_koalas = visit_occurrence_spark.to_koalas()
Expand Down
10 changes: 8 additions & 2 deletions eds_scikit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import importlib
import os
import pathlib
import sys
import time
from packaging import version
Expand All @@ -19,15 +20,20 @@

import pandas as pd
import pyarrow
import pyarrow.ipc
import pyspark
from loguru import logger
from pyspark import SparkContext
from pyspark.sql import SparkSession

import eds_scikit.biology # noqa: F401 --> To register functions
pyarrow.open_stream = pyarrow.ipc.open_stream

import eds_scikit.utils.logging
sys.path.insert(
0, (pathlib.Path(__file__).parent / "package-override").absolute().as_posix()
)
os.environ["PYTHONPATH"] = ":".join(sys.path)

import eds_scikit.biology # noqa: F401 --> To register functions

# Remove SettingWithCopyWarning
pd.options.mode.chained_assignment = None
Expand Down
2 changes: 1 addition & 1 deletion eds_scikit/biology/utils/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import glob
import os
from pathlib import Path
from typing import List

import pandas as pd
from importlib_metadata import os
from loguru import logger

from eds_scikit.biology.utils.process_concepts import ConceptsSet
Expand Down
Empty file.
17 changes: 17 additions & 0 deletions eds_scikit/package-override/databricks/koalas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# This file is used to override the databricks.koalas package with the pyspark.pandas
# package, if the databricks.koalas package is not available (python >= 3.8)
import sys
import pyarrow # noqa: E402, F401

old_sys_path = sys.path.copy()
sys.path.remove(next((p for p in sys.path if "package-override" in p), None))
databricks = sys.modules.pop("databricks")
sys.modules.pop("databricks.koalas")
try:
from databricks.koalas import * # noqa: E402, F401, F403
except ImportError:
from pyspark.pandas import * # noqa: E402, F401, F403

sys.modules["databricks"] = databricks
sys.modules["databricks.koalas"] = sys.modules["pyspark.pandas"]
sys.path[:] = old_sys_path
37 changes: 37 additions & 0 deletions eds_scikit/package-override/pyarrow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""
PySpark 2 needs pyarrow.open_stream, which was deprecated in 0.17.0 in favor of
pyarrow.ipc.open_stream. Here is the explanation of how we monkey-patch pyarrow
to add back pyarrow.open_stream for versions > 0.17 and how we make this work with
pyspark distributed computing :
1. We add this fake eds_scikit/package-override/pyarrow package to python lookup list
(the PYTHONPATH env var) in eds_scikit/__init__.py : this env variable will be shared
with the executors
2. When an executor starts and import packages, it looks in the packages by inspecting
the paths in PYTHONPATH. It finds our fake pyarrow package first an executes the
current eds_scikit/package-override/pyarrow/__init__.py file
3. In this file, we remove the fake pyarrow package path from the lookup list, unload
the current module from python modules cache (sys.modules) and re-import pyarrow
=> the executor's python will this time load the true pyarrow and store it in
sys.modules. Subsequent "import pyarrow" calls will return the sys.modules["pyarrow"]
value, which is the true pyarrow module.
4. We are not finished: we add back the deprecated "open_stream" function that was
removed in pyarrow 0.17.0 (the reason for all this hacking) by setting it
on the true pyarrow module
5. We still export the pyarrow module content (*) such that the first import, which
is the only one that resolves to this very module, still gets what it asked for:
the pyarrow module's content.
"""
import sys

old_sys_path = sys.path.copy()
sys.path.remove(next((p for p in sys.path if "package-override" in p), None))
del sys.modules["pyarrow"]

import pyarrow # noqa: E402, F401
from pyarrow.ipc import open_stream # noqa: E402, F401

pyarrow.open_stream = open_stream

from pyarrow import * # noqa: F401, F403, E402

sys.path[:] = old_sys_path
19 changes: 9 additions & 10 deletions eds_scikit/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def assert_equal(
return output


def assert_images_equal(image_1: str, image_2: str):
def image_diff(image_1: str, image_2: str):
img1 = Image.open(image_1)
img2 = Image.open(image_2)

Expand All @@ -105,12 +105,11 @@ def assert_images_equal(image_1: str, image_2: str):
img2 = img2.resize(img1.size)

sum_sq_diff = np.sum(
(np.asarray(img1).astype("float") - np.asarray(img2).astype("float")) ** 2
)

if sum_sq_diff == 0:
# Images are exactly the same
pass
else:
normalized_sum_sq_diff = sum_sq_diff / np.sqrt(sum_sq_diff)
assert normalized_sum_sq_diff < 0.001
(
np.asarray(img1).astype("float") / 255
- np.asarray(img2).astype("float") / 255
)
** 2
) / np.prod(img1.size)

return sum_sq_diff
24 changes: 13 additions & 11 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,22 @@ classifiers = [
"License :: OSI Approved :: BSD License",
"Operating System :: Unix",
]
requires-python = ">=3.7.1,<3.8"
requires-python = ">=3.7.1"
dependencies = [
"pgpasslib>=1.1.0, <2.0.0",
"psycopg2-binary>=2.9.0, <3.0.0",
"psycopg2-binary>=2.9.0",
"pandas>=1.3.0, <2.0.0",
"numpy>=1.0.0, <1.20",
"koalas>=1.8.1, <2.0.0",
"altair>=5.0.0, <6.0.0",
"numpy>=1.0.0",
"altair>=5.0.0",
"loguru==0.7.0",
"pypandoc==1.7.5",
"pyspark==2.4.3",
"pyarrow==0.17.0", #"pyarrow>=0.10, <0.17.0",
"pyspark",
"pyarrow>=0.10.0",
"pretty-html-table>=0.9.15, <0.10.0",
"catalogue",
"schemdraw>=0.15.0, <1.0.0",
"ipython>=7.32.0, <8.0.0",
"packaging==21.3",
"tomli==2.0.1",
"ipython>=7.32.0",
"packaging>=21.3",
"tomli>=2.0.1",
]
dynamic = ['version']

Expand All @@ -66,6 +64,10 @@ Documentation = "https://aphp.github.io/eds-scikit"
"Bug Tracker" = "https://github.com/aphp/eds-scikit/issues"

[project.optional-dependencies]
spark2 = [
"pyspark==2.4.3",
"koalas>=1.8.1,<2.0.0",
]
dev = [
"black>=22.3.0, <23.0.0",
"flake8==3.9.2",
Expand Down
9 changes: 4 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# isort: skip_file
import logging
import os

import eds_scikit
import pandas as pd
import pytest
from _pytest.logging import caplog as _caplog # noqa F401
from databricks import koalas as ks
from loguru import logger

import eds_scikit.utils.logging # noqa: F401
from eds_scikit import improve_performances

from . import test_registry # noqa: F401 --> To register functions
Expand Down Expand Up @@ -83,11 +86,6 @@ def spark_session(pytestconfig, tmpdir_factory):
SparkConf()
.setMaster("local")
.setAppName("testing")
# used to overwrite hive tables
.set(
"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation",
"true",
)
# Path to data and metastore
# Note: the option "hive.metastore.warehouse.dir" is deprecated
# But javax.jdo.option.ConnectionURL can be used for the path of 'metastrore_db'
Expand All @@ -100,6 +98,7 @@ def spark_session(pytestconfig, tmpdir_factory):
"javax.jdo.option.ConnectionURL",
f"jdbc:derby:;databaseName={temp_warehouse_dir}/metastore_db;create=true",
)
.set("spark.executor.cores", 1)
)

session, _, _ = improve_performances(to_add_conf=list(conf.getAll()))
Expand Down
Binary file added tests/flowchart/expected_flowchart_bis.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 6 additions & 5 deletions tests/flowchart/test_flowchart.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest

from eds_scikit.utils.flowchart import Flowchart
from eds_scikit.utils.test_utils import assert_images_equal
from eds_scikit.utils.test_utils import image_diff

data_as_df = pd.DataFrame(
dict(
Expand Down Expand Up @@ -63,12 +63,13 @@ def test_flowchart(data, tmpdir_factory):

_ = F.generate_flowchart(alternate=True, fontsize=10)

result_path = tmp_dir / "flowchart.png"
F.save(result_path, dpi=72)
out_path = str(tmp_dir / "flowchart.png")
F.save(out_path, dpi=72)

expected = Path(__file__).parent / "expected_flowchart.png"
tgt_1 = str(Path(__file__).parent / "expected_flowchart.png")
tgt_2 = str(Path(__file__).parent / "expected_flowchart_bis.png")

assert_images_equal(result_path, expected)
assert image_diff(out_path, tgt_1) < 0.05 or image_diff(out_path, tgt_2) < 0.05


def test_incorrect_data():
Expand Down
6 changes: 5 additions & 1 deletion tests/test_biology.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ def tmp_biology_dir(tmp_path_factory):

@pytest.fixture
def data():
return load_biology_data(seed=42)
return load_biology_data(
seed=42,
mean_measurement=500,
)


@pytest.fixture
Expand Down Expand Up @@ -73,6 +76,7 @@ def test_biology_summary(data, concepts_sets, module, tmp_biology_dir):
limit_count=("AnaBio", 500),
stats_only=True,
save_folder_path=tmp_biology_dir,
pd_limit_size=0,
)


Expand Down
4 changes: 2 additions & 2 deletions tests/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ def test_framework_koalas(example_objects):
def test_unconvertible_objects():
objects = [1, "coucou", {"a": [1, 2]}, [1, 2, 3], 2.5, ks, pd]
for obj in objects:
with pytest.raises(ValueError):
with pytest.raises((ValueError, TypeError)):
framework.pandas(obj)

for obj in objects:
with pytest.raises(ValueError):
with pytest.raises((ValueError, TypeError)):
framework.koalas(obj)
Loading