Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allows naming - move improve performance file #54

Merged
merged 13 commits into from
Feb 26, 2024
130 changes: 2 additions & 128 deletions eds_scikit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,10 @@
action="ignore", category=FutureWarning
) # Remove pyarrow DeprecatedWarning

import importlib
import os
import sys
import time
from packaging import version
from typing import List, Tuple
from pathlib import Path

import pandas as pd
import pyarrow
import pyspark
from loguru import logger
from pyspark import SparkContext
from pyspark.sql import SparkSession

import eds_scikit.biology # noqa: F401 --> To register functions
svittoz marked this conversation as resolved.
Show resolved Hide resolved

import eds_scikit.utils.logging

from eds_scikit.io import koalas_options, improve_performances
import eds_scikit.biology

# Remove SettingWithCopyWarning
pd.options.mode.chained_assignment = None
Expand All @@ -38,114 +23,3 @@
`spark, sc, sql = eds_scikit.improve_performances()`
The functions respectively returns a SparkSession, a SparkContext and an sql method"""
)

BASE_DIR = Path(__file__).parent


def load_koalas():

ks = sys.modules.get("databricks.koalas", None)

if ks is not None:
importlib.reload(ks)

else:
import databricks.koalas as ks

return ks


def koalas_options() -> None:
"""
Set necessary options to optimise Koalas
"""

# Reloading Koalas to use the new configuration
ks = load_koalas()

ks.set_option("compute.default_index_type", "distributed")
ks.set_option("compute.ops_on_diff_frames", True)
ks.set_option("display.max_rows", 50)


def set_env_variables() -> None:
# From https://github.com/databricks/koalas/blob/master/databricks/koalas/__init__.py
if version.parse(pyspark.__version__) < version.parse("3.0"):
if version.parse(pyarrow.__version__) >= version.parse("0.15"):
os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"

if version.parse(pyarrow.__version__) >= version.parse("2.0.0"):
os.environ["PYARROW_IGNORE_TIMEZONE"] = "0"


def improve_performances(
to_add_conf: List[Tuple[str, str]] = [],
quiet_spark: bool = True,
) -> Tuple[SparkSession, SparkContext, SparkSession.sql]:
"""
(Re)defines various Spark variable with some configuration changes
to improve performances by enabling Arrow
This has to be done
- Before launching a SparkCOntext
- Before importing Koalas
Those two points are being taken care on this function.
If a SparkSession already exists, it will copy its configuration before
creating a new one

Returns
-------
Tuple of
- A SparkSession
- The associated SparkContext
- The associated ``sql`` object to run SQL queries
"""

# Check if a spark Session is up
global spark, sc, sql

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

if quiet_spark:
sc.setLogLevel("ERROR")

conf = sc.getConf()

# Synchronizing TimeZone
tz = os.environ.get("TZ", "UTC")
os.environ["TZ"] = tz
time.tzset()

to_add_conf.extend(
[
("spark.app.name", f"{os.environ.get('USER')}_scikit"),
("spark.sql.session.timeZone", tz),
("spark.sql.execution.arrow.enabled", "true"),
("spark.sql.execution.arrow.pyspark.enabled", "true"),
]
)

for key, value in to_add_conf:
conf.set(key, value)

# Stopping context to add necessary env variables
sc.stop()
spark.stop()

set_env_variables()

spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()

sc = spark.sparkContext

if quiet_spark:
sc.setLogLevel("ERROR")

sql = spark.sql

koalas_options()

return spark, sc, sql


koalas_options()
1 change: 1 addition & 0 deletions eds_scikit/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .files import PandasData
from .hive import HiveData
from .postgres import PostgresData
from .improve_performance import improve_performances, koalas_options, load_koalas

__all__ = [
"BaseData",
Expand Down
118 changes: 118 additions & 0 deletions eds_scikit/io/improve_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import importlib
import os
import sys
import time
from packaging import version
from typing import List, Tuple
from pathlib import Path

import pyarrow
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession

BASE_DIR = Path(__file__).parent

def load_koalas():

ks = sys.modules.get("databricks.koalas", None)

if ks is not None:
importlib.reload(ks)

else:
import databricks.koalas as ks

Check warning on line 24 in eds_scikit/io/improve_performance.py

View check run for this annotation

Codecov / codecov/patch

eds_scikit/io/improve_performance.py#L24

Added line #L24 was not covered by tests

return ks


def koalas_options() -> None:
"""
Set necessary options to optimise Koalas
"""

# Reloading Koalas to use the new configuration
ks = load_koalas()

ks.set_option("compute.default_index_type", "distributed")
ks.set_option("compute.ops_on_diff_frames", True)
ks.set_option("display.max_rows", 50)

def set_env_variables() -> None:
# From https://github.com/databricks/koalas/blob/master/databricks/koalas/__init__.py
if version.parse(pyspark.__version__) < version.parse("3.0"):
if version.parse(pyarrow.__version__) >= version.parse("0.15"):
os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"

if version.parse(pyarrow.__version__) >= version.parse("2.0.0"):
os.environ["PYARROW_IGNORE_TIMEZONE"] = "0"

Check warning on line 48 in eds_scikit/io/improve_performance.py

View check run for this annotation

Codecov / codecov/patch

eds_scikit/io/improve_performance.py#L48

Added line #L48 was not covered by tests

def improve_performances(
to_add_conf: List[Tuple[str, str]] = [],
quiet_spark: bool = True,
app_name: str = "",
) -> Tuple[SparkSession, SparkContext, SparkSession.sql]:
"""
(Re)defines various Spark variable with some configuration changes
to improve performances by enabling Arrow
This has to be done
- Before launching a SparkCOntext
- Before importing Koalas
Those two points are being taken care on this function.
If a SparkSession already exists, it will copy its configuration before
creating a new one

Returns
-------
Tuple of
- A SparkSession
- The associated SparkContext
- The associated ``sql`` object to run SQL queries
"""

# Check if a spark Session is up
global spark, sc, sql

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

if quiet_spark:
sc.setLogLevel("ERROR")

conf = sc.getConf()

# Synchronizing TimeZone
tz = os.environ.get("TZ", "UTC")
os.environ["TZ"] = tz
time.tzset()

to_add_conf.extend(
[
("spark.app.name", f"{os.environ.get('USER')}_{app_name}_scikit"),
("spark.sql.session.timeZone", tz),
("spark.sql.execution.arrow.enabled", "true"),
("spark.sql.execution.arrow.pyspark.enabled", "true"),
]
)

for key, value in to_add_conf:
conf.set(key, value)

# Stopping context to add necessary env variables
sc.stop()
spark.stop()

set_env_variables()

spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()

sc = spark.sparkContext

if quiet_spark:
sc.setLogLevel("ERROR")

sql = spark.sql

koalas_options()

return spark, sc, sql
Loading