Skip to content

Commit

Permalink
Support defining the default config by env vars and add some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala committed Jan 26, 2024
1 parent e7daa99 commit 6f6f02e
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 17 deletions.
92 changes: 76 additions & 16 deletions spark_on_k8s/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

from spark_on_k8s.k8s.sync_client import KubernetesClientManager
from spark_on_k8s.utils.app_manager import SparkAppManager
from spark_on_k8s.utils.configuration import Configuration
from spark_on_k8s.utils.logging_mixin import LoggingMixin
from spark_on_k8s.utils.types import NOTSET, ArgNotSet

# For Python 3.8 and 3.9 compatibility
KW_ONLY_DATACLASS = {"kw_only": True} if "kw_only" in dataclass.__kwdefaults__ else {}
Expand Down Expand Up @@ -100,22 +102,22 @@ def __init__(
def submit_app(
self,
*,
image: str,
app_path: str,
namespace: str = "default",
service_account: str = "spark",
app_name: str | None = None,
spark_conf: dict[str, str] | None = None,
class_name: str | None = None,
app_arguments: list[str] | None = None,
app_id_suffix: Callable[[], str] = default_app_id_suffix,
app_waiter: Literal["no_wait", "wait", "log"] = SparkAppWait.NO_WAIT,
image_pull_policy: Literal["Always", "Never", "IfNotPresent"] = "IfNotPresent",
ui_reverse_proxy: bool = False,
driver_resources: PodResources | None = None,
executor_resources: PodResources | None = None,
executor_instances: ExecutorInstances | None = None,
should_print: bool = False,
image: str | ArgNotSet = NOTSET,
app_path: str | ArgNotSet = NOTSET,
namespace: str | ArgNotSet = NOTSET,
service_account: str | ArgNotSet = NOTSET,
app_name: str | ArgNotSet = NOTSET,
spark_conf: dict[str, str] | ArgNotSet = NOTSET,
class_name: str | ArgNotSet = NOTSET,
app_arguments: list[str] | ArgNotSet = NOTSET,
app_id_suffix: Callable[[], str] | ArgNotSet = NOTSET,
app_waiter: Literal["no_wait", "wait", "log"] | ArgNotSet = NOTSET,
image_pull_policy: Literal["Always", "Never", "IfNotPresent"] | ArgNotSet = NOTSET,
ui_reverse_proxy: bool | ArgNotSet = NOTSET,
driver_resources: PodResources | ArgNotSet = NOTSET,
executor_resources: PodResources | ArgNotSet = NOTSET,
executor_instances: ExecutorInstances | ArgNotSet = NOTSET,
should_print: bool | ArgNotSet = NOTSET,
) -> str:
"""Submit a Spark app to Kubernetes
Expand Down Expand Up @@ -149,6 +151,64 @@ def submit_app(
Returns:
Name of the Spark application pod
"""
if image is NOTSET:
if Configuration.SPARK_ON_K8S_DOCKER_IMAGE is None:
raise ValueError(
"Docker image is not set."
"Please set the image argument or the environment variable SPARK_ON_K8S_DOCKER_IMAGE"
)
image = Configuration.SPARK_ON_K8S_DOCKER_IMAGE
if app_path is NOTSET:
if Configuration.SPARK_ON_K8S_APP_PATH is None:
raise ValueError(
"Application path is not set."
"Please set the app_path argument or the environment variable SPARK_ON_K8S_APP_PATH"
)
app_path = Configuration.SPARK_ON_K8S_APP_PATH
if namespace is NOTSET:
namespace = Configuration.SPARK_ON_K8S_NAMESPACE
if service_account is NOTSET:
service_account = Configuration.SPARK_ON_K8S_SERVICE_ACCOUNT
if app_name is NOTSET:
app_name = Configuration.SPARK_ON_K8S_APP_NAME
if spark_conf is NOTSET:
spark_conf = Configuration.SPARK_ON_K8S_SPARK_CONF
if class_name is NOTSET:
class_name = Configuration.SPARK_ON_K8S_CLASS_NAME
if app_arguments is NOTSET:
app_arguments = Configuration.SPARK_ON_K8S_APP_ARGUMENTS
if app_id_suffix is NOTSET:
app_id_suffix = default_app_id_suffix
if app_waiter is NOTSET:
app_waiter = Configuration.SPARK_ON_K8S_APP_WAITER
if image_pull_policy is NOTSET:
image_pull_policy = Configuration.SPARK_ON_K8S_IMAGE_PULL_POLICY
if ui_reverse_proxy is NOTSET:
ui_reverse_proxy = Configuration.SPARK_ON_K8S_UI_REVERSE_PROXY
if driver_resources is NOTSET:
driver_resources = PodResources(
cpu=Configuration.SPARK_ON_K8S_DRIVER_CPU,
memory=Configuration.SPARK_ON_K8S_DRIVER_MEMORY,
memory_overhead=Configuration.SPARK_ON_K8S_DRIVER_MEMORY_OVERHEAD,
)
if executor_resources is NOTSET:
executor_resources = PodResources(
cpu=Configuration.SPARK_ON_K8S_EXECUTOR_CPU,
memory=Configuration.SPARK_ON_K8S_EXECUTOR_MEMORY,
memory_overhead=Configuration.SPARK_ON_K8S_EXECUTOR_MEMORY_OVERHEAD,
)
if executor_instances is NOTSET:
executor_instances = ExecutorInstances(
min=Configuration.SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES,
max=Configuration.SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES,
initial=Configuration.SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES,
)
if (
executor_instances.min is None
and executor_instances.max is None
and executor_instances.initial is None
):
executor_instances.initial = 2
app_name, app_id = self._parse_app_name_and_id(
app_name=app_name, app_id_suffix=app_id_suffix, should_print=should_print
)
Expand Down
41 changes: 41 additions & 0 deletions spark_on_k8s/utils/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from __future__ import annotations

import json
from os import getenv


class Configuration:
"""Spark on Kubernetes configuration."""

SPARK_ON_K8S_DOCKER_IMAGE = getenv("SPARK_ON_K8S_DOCKER_IMAGE")
SPARK_ON_K8S_APP_PATH = getenv("SPARK_ON_K8S_APP_PATH")
SPARK_ON_K8S_NAMESPACE = getenv("SPARK_ON_K8S_NAMESPACE", "default")
SPARK_ON_K8S_SERVICE_ACCOUNT = getenv("SPARK_ON_K8S_SERVICE_ACCOUNT", "spark")
SPARK_ON_K8S_APP_NAME = getenv("SPARK_ON_K8S_APP_NAME")
SPARK_ON_K8S_SPARK_CONF = json.loads(getenv("SPARK_ON_K8S_SPARK_CONF", "{}"))
SPARK_ON_K8S_CLASS_NAME = getenv("SPARK_ON_K8S_CLASS_NAME")
SPARK_ON_K8S_APP_ARGUMENTS = json.loads(getenv("SPARK_ON_K8S_APP_ARGUMENTS", "[]"))
SPARK_ON_K8S_APP_WAITER = getenv("SPARK_ON_K8S_APP_WAITER", "no_wait")
SPARK_ON_K8S_IMAGE_PULL_POLICY = getenv("SPARK_ON_K8S_IMAGE_PULL_POLICY", "IfNotPresent")
SPARK_ON_K8S_UI_REVERSE_PROXY = getenv("SPARK_ON_K8S_UI_REVERSE_PROXY", "false").lower() == "true"
SPARK_ON_K8S_DRIVER_CPU = int(getenv("SPARK_ON_K8S_DRIVER_CPU", 1))
SPARK_ON_K8S_DRIVER_MEMORY = int(getenv("SPARK_ON_K8S_DRIVER_MEMORY", 1024))
SPARK_ON_K8S_DRIVER_MEMORY_OVERHEAD = int(getenv("SPARK_ON_K8S_DRIVER_MEMORY_OVERHEAD", 512))
SPARK_ON_K8S_EXECUTOR_CPU = int(getenv("SPARK_ON_K8S_EXECUTOR_CPU", 1))
SPARK_ON_K8S_EXECUTOR_MEMORY = int(getenv("SPARK_ON_K8S_EXECUTOR_MEMORY", 1024))
SPARK_ON_K8S_EXECUTOR_MEMORY_OVERHEAD = int(getenv("SPARK_ON_K8S_EXECUTOR_MEMORY_OVERHEAD", 512))
SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES = (
int(getenv("SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES"))
if getenv("SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES")
else None
)
SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES = (
int(getenv("SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES"))
if getenv("SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES")
else None
)
SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES = (
int(getenv("SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES"))
if getenv("SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES")
else None
)
8 changes: 8 additions & 0 deletions spark_on_k8s/utils/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from __future__ import annotations


class ArgNotSet:
"""A type used to indicate that an argument was not set."""


NOTSET = ArgNotSet()
184 changes: 183 additions & 1 deletion tests/test_spark_client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from __future__ import annotations

import datetime
import importlib
import json
import os
from typing import Callable
from unittest import mock

import pytest
from freezegun import freeze_time
from spark_on_k8s.client import SparkOnK8S, default_app_id_suffix
from spark_on_k8s import client as client_module
from spark_on_k8s.client import ExecutorInstances, PodResources, SparkOnK8S, default_app_id_suffix
from spark_on_k8s.utils import configuration as configuration_module

FAKE_TIME = datetime.datetime(2024, 1, 14, 12, 12, 31)

Expand Down Expand Up @@ -159,3 +165,179 @@ def test_parse_app_name_and_id(
)
assert actual_app_name == expected_app_name, "The app name is not as expected"
assert actual_app_id == expected_app_id, "The app id is not as expected"

@mock.patch("kubernetes.client.api.core_v1_api.CoreV1Api.create_namespaced_pod")
@mock.patch("kubernetes.client.api.core_v1_api.CoreV1Api.create_namespaced_service")
@freeze_time(FAKE_TIME)
def test_submit_app(self, mock_create_namespaced_service, mock_create_namespaced_pod):
"""Test the method submit_app"""

spark_client = SparkOnK8S()
spark_client.submit_app(
image="pyspark-job",
app_path="local:///opt/spark/work-dir/job.py",
namespace="spark",
service_account="spark",
app_name="pyspark-job-example",
app_arguments=["100000"],
app_waiter="no_wait",
image_pull_policy="Never",
ui_reverse_proxy=True,
driver_resources=PodResources(cpu=1, memory=2048, memory_overhead=1024),
executor_instances=ExecutorInstances(min=2, max=5, initial=5),
)

expected_app_name = "pyspark-job-example"
expected_app_id = f"{expected_app_name}-20240114121231"

created_pod = mock_create_namespaced_pod.call_args[1]["body"]
assert created_pod.metadata.name == f"{expected_app_id}-driver"
assert created_pod.metadata.labels["spark-app-name"] == expected_app_name
assert created_pod.metadata.labels["spark-app-id"] == expected_app_id
assert created_pod.metadata.labels["spark-role"] == "driver"
assert created_pod.spec.containers[0].image == "pyspark-job"
assert created_pod.spec.containers[0].args == [
"driver",
"--master",
"k8s://https://kubernetes.default.svc.cluster.local:443",
"--conf",
f"spark.app.name={expected_app_name}",
"--conf",
f"spark.app.id={expected_app_id}",
"--conf",
"spark.kubernetes.namespace=spark",
"--conf",
"spark.kubernetes.authenticate.driver.serviceAccountName=spark",
"--conf",
"spark.kubernetes.container.image=pyspark-job",
"--conf",
f"spark.driver.host={expected_app_id}",
"--conf",
"spark.driver.port=7077",
"--conf",
f"spark.kubernetes.driver.pod.name={expected_app_id}-driver",
"--conf",
f"spark.kubernetes.executor.podNamePrefix={expected_app_id}",
"--conf",
"spark.kubernetes.container.image.pullPolicy=Never",
"--conf",
"spark.driver.memory=2048m",
"--conf",
"spark.executor.cores=1",
"--conf",
"spark.executor.memory=1024m",
"--conf",
"spark.executor.memoryOverhead=512m",
"--conf",
f"spark.ui.proxyBase=/webserver/ui/spark/{expected_app_id}",
"--conf",
"spark.ui.proxyRedirectUri=/",
"--conf",
"spark.dynamicAllocation.enabled=true",
"--conf",
"spark.dynamicAllocation.shuffleTracking.enabled=true",
"--conf",
"spark.dynamicAllocation.minExecutors=2",
"--conf",
"spark.dynamicAllocation.maxExecutors=5",
"--conf",
"spark.dynamicAllocation.initialExecutors=5",
"local:///opt/spark/work-dir/job.py",
"100000",
]

@mock.patch("spark_on_k8s.utils.app_manager.SparkAppManager.stream_logs")
@mock.patch("kubernetes.client.api.core_v1_api.CoreV1Api.create_namespaced_pod")
@mock.patch("kubernetes.client.api.core_v1_api.CoreV1Api.create_namespaced_service")
@freeze_time(FAKE_TIME)
def test_submit_app_with_env_configurations(
self, mock_create_namespaced_service, mock_create_namespaced_pod, mock_stream_logs
):
"""Test the method submit_app with env configurations"""
os.environ["SPARK_ON_K8S_DOCKER_IMAGE"] = "test-spark-on-k8s-docker-image"
os.environ["SPARK_ON_K8S_APP_PATH"] = "/path/to/app.py"
os.environ["SPARK_ON_K8S_NAMESPACE"] = "test-namespace"
os.environ["SPARK_ON_K8S_SERVICE_ACCOUNT"] = "test-service-account"
os.environ["SPARK_ON_K8S_APP_NAME"] = "test-spark-app"
os.environ["SPARK_ON_K8S_APP_ARGUMENTS"] = '["arg1","arg2"]'
os.environ["SPARK_ON_K8S_APP_WAITER"] = "log"
os.environ["SPARK_ON_K8S_IMAGE_PULL_POLICY"] = "Always"
os.environ["SPARK_ON_K8S_UI_REVERSE_PROXY"] = "true"
os.environ["SPARK_ON_K8S_DRIVER_CPU"] = "1"
os.environ["SPARK_ON_K8S_DRIVER_MEMORY"] = "1024"
os.environ["SPARK_ON_K8S_DRIVER_MEMORY_OVERHEAD"] = "512"
os.environ["SPARK_ON_K8S_EXECUTOR_CPU"] = "1"
os.environ["SPARK_ON_K8S_EXECUTOR_MEMORY"] = "718"
os.environ["SPARK_ON_K8S_EXECUTOR_MEMORY_OVERHEAD"] = "512"
os.environ["SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES"] = "2"
os.environ["SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES"] = "5"
os.environ["SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES"] = "5"
os.environ["SPARK_ON_K8S_SPARK_CONF"] = json.dumps(
{"spark.conf1.key": "spark.conf1.value", "spark.conf2.key": "spark.conf2.value"}
)

importlib.reload(configuration_module)
importlib.reload(client_module)

spark_client = SparkOnK8S()
spark_client.submit_app()

expected_app_name = "test-spark-app"
expected_app_id = f"{expected_app_name}-20240114121231"

created_pod = mock_create_namespaced_pod.call_args[1]["body"]
assert created_pod.spec.containers[0].image == "test-spark-on-k8s-docker-image"
assert created_pod.spec.containers[0].args == [
"driver",
"--master",
"k8s://https://kubernetes.default.svc.cluster.local:443",
"--conf",
f"spark.app.name={expected_app_name}",
"--conf",
f"spark.app.id={expected_app_id}",
"--conf",
"spark.kubernetes.namespace=test-namespace",
"--conf",
"spark.kubernetes.authenticate.driver.serviceAccountName=test-service-account",
"--conf",
"spark.kubernetes.container.image=test-spark-on-k8s-docker-image",
"--conf",
f"spark.driver.host={expected_app_id}",
"--conf",
"spark.driver.port=7077",
"--conf",
f"spark.kubernetes.driver.pod.name={expected_app_id}-driver",
"--conf",
f"spark.kubernetes.executor.podNamePrefix={expected_app_id}",
"--conf",
"spark.kubernetes.container.image.pullPolicy=Always",
"--conf",
"spark.driver.memory=1024m",
"--conf",
"spark.executor.cores=1",
"--conf",
"spark.executor.memory=718m",
"--conf",
"spark.executor.memoryOverhead=512m",
"--conf",
f"spark.ui.proxyBase=/webserver/ui/test-namespace/{expected_app_id}",
"--conf",
"spark.ui.proxyRedirectUri=/",
"--conf",
"spark.dynamicAllocation.enabled=true",
"--conf",
"spark.dynamicAllocation.shuffleTracking.enabled=true",
"--conf",
"spark.dynamicAllocation.minExecutors=2",
"--conf",
"spark.dynamicAllocation.maxExecutors=5",
"--conf",
"spark.dynamicAllocation.initialExecutors=5",
"--conf",
"spark.conf1.key=spark.conf1.value",
"--conf",
"spark.conf2.key=spark.conf2.value",
"/path/to/app.py",
"arg1",
"arg2",
]

0 comments on commit 6f6f02e

Please sign in to comment.