Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allows naming - move improve performance file #54

Merged
merged 13 commits into from
Feb 26, 2024
6 changes: 6 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
# Changelog

## Unreleased
### Added
- load_koalas() not by default in __init__.py but called in the improve_performance function
- adding app_name in improve_performances to facilitate app monitoring

### Fixed
- Generation of an inclusion/exclusion flowchart in plotting
- improve_performance moved from __init__.py to io/improve_performance.py file
- Caching in spark instead of koalas to improve speed

## v0.1.6 (2023-09-27)
Expand Down
130 changes: 2 additions & 128 deletions eds_scikit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,10 @@
action="ignore", category=FutureWarning
) # Remove pyarrow DeprecatedWarning

import importlib
import os
import sys
import time
from packaging import version
from typing import List, Tuple
from pathlib import Path

import pandas as pd
import pyarrow
import pyspark
from loguru import logger
from pyspark import SparkContext
from pyspark.sql import SparkSession

import eds_scikit.biology # noqa: F401 --> To register functions
svittoz marked this conversation as resolved.
Show resolved Hide resolved

import eds_scikit.utils.logging

from eds_scikit.io import koalas_options, improve_performances
import eds_scikit.biology

# Remove SettingWithCopyWarning
pd.options.mode.chained_assignment = None
Expand All @@ -38,114 +23,3 @@
`spark, sc, sql = eds_scikit.improve_performances()`
The functions respectively returns a SparkSession, a SparkContext and an sql method"""
)

BASE_DIR = Path(__file__).parent


def load_koalas():

ks = sys.modules.get("databricks.koalas", None)

if ks is not None:
importlib.reload(ks)

else:
import databricks.koalas as ks

return ks


def koalas_options() -> None:
"""
Set necessary options to optimise Koalas
"""

# Reloading Koalas to use the new configuration
ks = load_koalas()

ks.set_option("compute.default_index_type", "distributed")
ks.set_option("compute.ops_on_diff_frames", True)
ks.set_option("display.max_rows", 50)


def set_env_variables() -> None:
# From https://github.com/databricks/koalas/blob/master/databricks/koalas/__init__.py
if version.parse(pyspark.__version__) < version.parse("3.0"):
if version.parse(pyarrow.__version__) >= version.parse("0.15"):
os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"

if version.parse(pyarrow.__version__) >= version.parse("2.0.0"):
os.environ["PYARROW_IGNORE_TIMEZONE"] = "0"


def improve_performances(
to_add_conf: List[Tuple[str, str]] = [],
quiet_spark: bool = True,
) -> Tuple[SparkSession, SparkContext, SparkSession.sql]:
"""
(Re)defines various Spark variable with some configuration changes
to improve performances by enabling Arrow
This has to be done
- Before launching a SparkCOntext
- Before importing Koalas
Those two points are being taken care on this function.
If a SparkSession already exists, it will copy its configuration before
creating a new one

Returns
-------
Tuple of
- A SparkSession
- The associated SparkContext
- The associated ``sql`` object to run SQL queries
"""

# Check if a spark Session is up
global spark, sc, sql

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

if quiet_spark:
sc.setLogLevel("ERROR")

conf = sc.getConf()

# Synchronizing TimeZone
tz = os.environ.get("TZ", "UTC")
os.environ["TZ"] = tz
time.tzset()

to_add_conf.extend(
[
("spark.app.name", f"{os.environ.get('USER')}_scikit"),
("spark.sql.session.timeZone", tz),
("spark.sql.execution.arrow.enabled", "true"),
("spark.sql.execution.arrow.pyspark.enabled", "true"),
]
)

for key, value in to_add_conf:
conf.set(key, value)

# Stopping context to add necessary env variables
sc.stop()
spark.stop()

set_env_variables()

spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()

sc = spark.sparkContext

if quiet_spark:
sc.setLogLevel("ERROR")

sql = spark.sql

koalas_options()

return spark, sc, sql


koalas_options()
1 change: 1 addition & 0 deletions eds_scikit/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .files import PandasData
from .hive import HiveData
from .postgres import PostgresData
from .improve_performance import improve_performances, koalas_options, load_koalas

__all__ = [
"BaseData",
Expand Down
121 changes: 121 additions & 0 deletions eds_scikit/io/improve_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import importlib
import os
import sys
import time
from pathlib import Path
from typing import List, Tuple

import pyarrow
import pyspark
from packaging import version
from pyspark import SparkContext
from pyspark.sql import SparkSession

BASE_DIR = Path(__file__).parent


def load_koalas():

ks = sys.modules.get("databricks.koalas", None)

if ks is not None:
importlib.reload(ks)

else:
import databricks.koalas as ks

return ks


def koalas_options() -> None:
"""
Set necessary options to optimise Koalas
"""

# Reloading Koalas to use the new configuration
ks = load_koalas()

ks.set_option("compute.default_index_type", "distributed")
ks.set_option("compute.ops_on_diff_frames", True)
ks.set_option("display.max_rows", 50)


def set_env_variables() -> None:
# From https://github.com/databricks/koalas/blob/master/databricks/koalas/__init__.py
if version.parse(pyspark.__version__) < version.parse("3.0"):
if version.parse(pyarrow.__version__) >= version.parse("0.15"):
os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"

if version.parse(pyarrow.__version__) >= version.parse("2.0.0"):
os.environ["PYARROW_IGNORE_TIMEZONE"] = "0"


def improve_performances(
to_add_conf: List[Tuple[str, str]] = [],
quiet_spark: bool = True,
app_name: str = "",
) -> Tuple[SparkSession, SparkContext, SparkSession.sql]:
"""
(Re)defines various Spark variable with some configuration changes
to improve performances by enabling Arrow
This has to be done
- Before launching a SparkCOntext
- Before importing Koalas
Those two points are being taken care on this function.
If a SparkSession already exists, it will copy its configuration before
creating a new one

Returns
-------
Tuple of
- A SparkSession
- The associated SparkContext
- The associated ``sql`` object to run SQL queries
"""

# Check if a spark Session is up
global spark, sc, sql

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

if quiet_spark:
sc.setLogLevel("ERROR")

conf = sc.getConf()

# Synchronizing TimeZone
tz = os.environ.get("TZ", "UTC")
os.environ["TZ"] = tz
time.tzset()

to_add_conf.extend(
[
("spark.app.name", f"{os.environ.get('USER')}_{app_name}_scikit"),
("spark.sql.session.timeZone", tz),
("spark.sql.execution.arrow.enabled", "true"),
("spark.sql.execution.arrow.pyspark.enabled", "true"),
]
)

for key, value in to_add_conf:
conf.set(key, value)

# Stopping context to add necessary env variables
sc.stop()
spark.stop()

set_env_variables()

spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()

sc = spark.sparkContext

if quiet_spark:
sc.setLogLevel("ERROR")

sql = spark.sql

koalas_options()

return spark, sc, sql
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from databricks import koalas as ks
from loguru import logger

from eds_scikit import improve_performances
from eds_scikit.io.improve_performance import improve_performances

from . import test_registry # noqa: F401 --> To register functions

Expand Down
14 changes: 14 additions & 0 deletions tests/test_improve_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import sys
from unittest.mock import patch

import pyarrow

from eds_scikit.io.improve_performance import load_koalas, set_env_variables


def test_improve_performances():
del sys.modules["databricks.koalas"]
load_koalas()

with patch.object(pyarrow, "__version__", "2.1.0"):
set_env_variables()
Loading