Skip to content

Commit

Permalink
Add support for specifying k8s tolerations (#192)
Browse files Browse the repository at this point in the history
People may want/need more control over which nodes their jobs run on. K8s tolerations is a mechanism many people use to achieve this. If we don't support them, people may not be able to run their Sematic funcs where they want them to go. This adds support for it.

It also adds support for enums, because the natural way to support tolerations includes some enum types that are currently serialized/deserialized with Sematic's type stuff. It fails without actual enum support. I've wanted enum support a few times anyway, so I took this opportunity to "bite the bullet."
  • Loading branch information
augray authored Oct 7, 2022
1 parent 41d0f75 commit 29f69ea
Show file tree
Hide file tree
Showing 17 changed files with 460 additions and 5 deletions.
3 changes: 3 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ Lines for version numbers should always be formatted as `* MAJOR.MINOR.PATCH`
with nothing else on the line.
-->
* HEAD
* [feature] Support Enums
* [feature] Allow specifying Kubernetes tolerations for cloud jobs
* [feature] Redesigned log UI
* 0.15.1
* [bugfix] Ensure log ingestion happens when using bazel cloud jobs and `dev=False`
* [bugfix] Avoid spamming the server with requests for logs for incomplete runs
Expand Down
3 changes: 3 additions & 0 deletions sematic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
from sematic.resolvers.resource_requirements import ( # noqa: F401,E402
KubernetesResourceRequirements,
KubernetesSecretMount,
KubernetesToleration,
KubernetesTolerationEffect,
KubernetesTolerationOperator,
ResourceRequirements,
)
from sematic.versions import CURRENT_VERSION_STR as __version__ # noqa: F401,E402
153 changes: 152 additions & 1 deletion sematic/resolvers/resource_requirements.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Standard Library
from dataclasses import dataclass, field
from typing import Dict
from enum import Enum, unique
from typing import Dict, List, Optional, Union

KUBERNETES_SECRET_NAME = "sematic-func-secrets"

Expand Down Expand Up @@ -49,6 +50,150 @@ class KubernetesSecretMount:
file_secret_root_path: str = "/secrets"


@unique
class KubernetesTolerationOperator(Enum):
"""The way that a toleration should be checked to see if it applies
See Kubernetes documentation for more:
https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/
Options
-------
Equal:
value must be specified, and must be equal for the toleration and the taint
for the toleration to be considered to apply. In addition to this condition,
the "effect" must be equal for the toleration and the taint for the toleration
to be considered to apply.
Exists:
value is not required. If a taint with the given key exists on the node,
the toleration is considered to apply. In addition to this condition,
the "effect" must be equal for the toleration and the taint for the toleration
to be considered to apply.
"""

Equal = "Equal"
Exists = "Exists"


@unique
class KubernetesTolerationEffect(Enum):
"""The effect that the toleration is meant to tolerate
See Kubernetes documentation for more:
https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/
Options
-------
NoSchedule:
The toleration indicates that the pod can run on the node even
if it has specified a NoSchedule taint, assuming the rest of
the toleration matches the taint.
PreferNoSchedule:
The toleration indicates that the pod can run on the node even
if it has specified a PreferNoSchedule taint, assuming the rest
of the toleration matches the taint.
NoExecute:
The pod will not be evicted from the node even if the node has
specified a NoExecute taint, assuming the rest of the toleration
matches the taint.
All:
The pod will not be evicted from the node even if the node has
any kind of taint, assuming the rest of the toleration
matches the taint.
"""

NoSchedule = "NoSchedule"
PreferNoSchedule = "PreferNoSchedule"
NoExecute = "NoExecute"
All = "All"


@dataclass
class KubernetesToleration:
"""Toleration for a node taint, enabling the pod for the function to run on the node
See Kubernetes documentation for more:
https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/
Attributes
----------
key:
The key for the node taint intended to be tolerated. If empty, means
to match all keys AND all values
operator:
The way to compare the key/value pair to the node taint's key/value pair
to see if the toleration applies
effect:
The effect of the node taint the toleration is intended to tolerate.
Leaving it empty means to tolerate all effects.
value:
If the operator is Equals, this value will be compared to the value
on the node taint to see if this toleration applies.
toleration_seconds:
Only specified when effect is NoExecute (otherwise is an error). It
specifies the amount of time the pod can continue executing on a node
with a NoExecute taint
"""

key: Optional[str] = None
operator: KubernetesTolerationOperator = KubernetesTolerationOperator.Equal
effect: KubernetesTolerationEffect = KubernetesTolerationEffect.All
value: Optional[str] = None
toleration_seconds: Optional[int] = None

def to_api_keyword_args(self) -> Dict[str, Optional[Union[str, int]]]:
"""Convert to the format for kwargs the API python client API for tolerations"""
effect: Optional[str] = self.effect.value
if self.effect == KubernetesTolerationEffect.All:
# the actual API makes "all" the default behavior with no other way to
# specify
effect = None
operator = self.operator.value
return dict(
effect=effect,
key=self.key,
operator=operator,
toleration_seconds=self.toleration_seconds,
value=self.value,
)

def __post_init__(self):
"""Ensure that the values in the toleration are valid; raise otherwise
Raises
------
ValueError:
If the values are not valid
"""
if not (self.key is None or isinstance(self.key, str)):
raise ValueError(f"key must be None or a string, got: {self.key}")
if not isinstance(self.operator, KubernetesTolerationOperator):
raise ValueError(
f"operator must be a {KubernetesTolerationOperator}, got {self.operator}"
)
if not isinstance(self.effect, KubernetesTolerationEffect):
raise ValueError(
f"effect must be a {KubernetesTolerationEffect}, got {self.effect}"
)
if not (self.value is None or isinstance(self.value, str)):
raise ValueError(f"value must be None or a string, got: {self.value}")
if not (
self.toleration_seconds is None or isinstance(self.toleration_seconds, int)
):
raise ValueError(
"toleration_seconds must be None or an "
f"int, got: {self.toleration_seconds}"
)
if (
self.toleration_seconds is not None
and self.effect != KubernetesTolerationEffect.NoExecute
):
raise ValueError(
"toleration_seconds should only be specified when the effect "
"is NoExecute."
)


@dataclass
class KubernetesResourceRequirements:
"""Information on the Kubernetes resources required.
Expand All @@ -69,11 +214,17 @@ class KubernetesResourceRequirements:
secret_mounts:
Requests to take the contents of Kubernetes secrets and expose them as
environment variables or files on disk when running in the cloud.
tolerations:
If your Kubernetes configuration uses node taints to control which workloads
get scheduled on which nodes, this enables control over how your workload
interacts with these node taints. More information can be found here:
https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/
"""

node_selector: Dict[str, str] = field(default_factory=dict)
requests: Dict[str, str] = field(default_factory=dict)
secret_mounts: KubernetesSecretMount = field(default_factory=KubernetesSecretMount)
tolerations: List[KubernetesToleration] = field(default_factory=list)


@dataclass
Expand Down
10 changes: 10 additions & 0 deletions sematic/resolvers/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ pytest_test(
],
)


pytest_test(
name = "test_resource_requirements",
srcs = ["test_resource_requirements.py"],
deps = [
"//sematic/resolvers:resource_requirements",
"//sematic/types:serialization",
],
)

pytest_test(
name = "test_silent_resolver",
srcs = ["test_silent_resolver.py"],
Expand Down
55 changes: 55 additions & 0 deletions sematic/resolvers/tests/test_resource_requirements.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import pytest

# Sematic
from sematic.resolvers.resource_requirements import (
KubernetesResourceRequirements,
KubernetesSecretMount,
KubernetesToleration,
KubernetesTolerationEffect,
KubernetesTolerationOperator,
ResourceRequirements,
)
from sematic.types.serialization import (
value_from_json_encodable,
value_to_json_encodable,
)


def test_is_serializable():
requirements = ResourceRequirements(
kubernetes=KubernetesResourceRequirements(
node_selector={"foo": "bar"},
requests={"cpu": "500m", "memory": "100Gi"},
secret_mounts=KubernetesSecretMount(
environment_secrets={"a": "b"},
file_secret_root_path="/foo/bar",
file_secrets={"c": "d"},
),
tolerations=[
KubernetesToleration(
key="k",
value="v",
effect=KubernetesTolerationEffect.NoExecute,
operator=KubernetesTolerationOperator.Equal,
toleration_seconds=42,
)
],
)
)
encoded = value_to_json_encodable(requirements, ResourceRequirements)
decoded = value_from_json_encodable(encoded, ResourceRequirements)
assert decoded == requirements


def test_validation():
with pytest.raises(
ValueError,
match="toleration_seconds should only be specified when the effect is NoExecute.",
):
KubernetesToleration(
key="k",
value="v",
effect=KubernetesTolerationEffect.PreferNoSchedule,
operator=KubernetesTolerationOperator.Equal,
toleration_seconds=42,
)
12 changes: 11 additions & 1 deletion sematic/scheduling/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def _schedule_kubernetes_job(
volumes = []
volume_mounts = []
secret_env_vars = []
tolerations = []
if resource_requirements is not None:
node_selector = resource_requirements.kubernetes.node_selector
resource_requests = resource_requirements.kubernetes.requests
Expand All @@ -214,10 +215,17 @@ def _schedule_kubernetes_job(
secret_env_vars.extend(
_environment_secrets(resource_requirements.kubernetes.secret_mounts)
)
tolerations = [
kubernetes.client.V1Toleration( # type: ignore
**toleration.to_api_keyword_args() # type: ignore
)
for toleration in resource_requirements.kubernetes.tolerations
]
logger.debug("kubernetes node_selector %s", node_selector)
logger.debug("kubernetes resource requests %s", resource_requests)
logger.debug("kubernetes volumes and mounts: %s, %s", volumes, volume_mounts)
logger.debug("kubernetes environment secrets: %s", secret_env_vars)
logger.debug("kubernetes tolerations: %s", tolerations)

pod_name_env_var = kubernetes.client.V1EnvVar( # type: ignore
name=KUBERNETES_POD_NAME_ENV_VAR,
Expand All @@ -228,6 +236,8 @@ def _schedule_kubernetes_job(
),
)

# See client documentation here:
# https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Job.md
job = kubernetes.client.V1Job( # type: ignore
api_version="batch/v1",
kind="Job",
Expand Down Expand Up @@ -282,7 +292,7 @@ def _schedule_kubernetes_job(
)
],
volumes=volumes,
tolerations=[],
tolerations=tolerations,
restart_policy="Never",
),
),
Expand Down
27 changes: 27 additions & 0 deletions sematic/scheduling/tests/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from sematic.resolvers.resource_requirements import (
KubernetesResourceRequirements,
KubernetesSecretMount,
KubernetesToleration,
KubernetesTolerationEffect,
KubernetesTolerationOperator,
ResourceRequirements,
)
from sematic.scheduling.kubernetes import (
Expand Down Expand Up @@ -47,6 +50,16 @@ def test_schedule_kubernetes_job(
file_secrets=file_secrets,
file_secret_root_path=secret_root,
),
tolerations=[
KubernetesToleration(
key="foo",
operator=KubernetesTolerationOperator.Equal,
effect=KubernetesTolerationEffect.NoExecute,
value="bar",
toleration_seconds=42,
),
KubernetesToleration(),
],
)
)
mock_user_settings.return_value = {"KUBERNETES_NAMESPACE": namespace}
Expand Down Expand Up @@ -86,6 +99,20 @@ def test_schedule_kubernetes_job(
assert container.resources.limits == requests
assert container.resources.requests == requests

tolerations = job.spec.template.spec.tolerations
assert len(tolerations) == 2
assert tolerations[0].key == "foo"
assert tolerations[0].value == "bar"
assert tolerations[0].effect == "NoExecute"
assert tolerations[0].operator == "Equal"
assert tolerations[0].toleration_seconds == 42

assert tolerations[1].key is None
assert tolerations[1].value is None
assert tolerations[1].effect is None
assert tolerations[1].operator == "Equal"
assert tolerations[1].toleration_seconds is None


IS_ACTIVE_CASES = [
(
Expand Down
1 change: 1 addition & 0 deletions sematic/types/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ sematic_py_lib(
":type",
"//sematic/types/types:bool",
"//sematic/types/types:dataclass",
"//sematic/types/types:enum",
"//sematic/types/types:float",
"//sematic/types/types:integer",
"//sematic/types/types:none",
Expand Down
1 change: 1 addition & 0 deletions sematic/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sematic.types.types.bool # noqa: F401
import sematic.types.types.dataclass # noqa: F401
import sematic.types.types.dict # noqa: F401
import sematic.types.types.enum # noqa: F401
import sematic.types.types.float # noqa: F401
import sematic.types.types.integer # noqa: F401
import sematic.types.types.list # noqa: F401
Expand Down
Loading

0 comments on commit 29f69ea

Please sign in to comment.