Skip to content

Commit

Permalink
[jobsets] Fix killing jobsets using deletion (#2149)
Browse files Browse the repository at this point in the history
  • Loading branch information
valayDave authored Nov 21, 2024
1 parent 05f9756 commit d8c7cc5
Showing 1 changed file with 43 additions and 28 deletions.
71 changes: 43 additions & 28 deletions metaflow/plugins/kubernetes/kubernetes_jobsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import random
import time
from collections import namedtuple

from metaflow.exception import MetaflowException
from metaflow.metaflow_config import KUBERNETES_JOBSET_GROUP, KUBERNETES_JOBSET_VERSION
from metaflow.tracing import inject_tracing_vars
Expand Down Expand Up @@ -320,33 +319,49 @@ def _fetch_pod(self):
def kill(self):
plural = "jobsets"
client = self._client.get()
# Get the jobset
with client.ApiClient() as api_client:
api_instance = client.CustomObjectsApi(api_client)
try:
obj = api_instance.get_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=self._namespace,
plural=plural,
name=self._name,
)

# Suspend the jobset
obj["spec"]["suspend"] = True

api_instance.replace_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=self._namespace,
plural=plural,
name=obj["metadata"]["name"],
body=obj,
)
except Exception as e:
raise KubernetesJobsetException(
"Exception when suspending existing jobset: %s\n" % e
)
try:
# Killing the control pod will trigger the jobset to mark everything as failed.
# Since jobsets have a successPolicy set to `All` which ensures that everything has
# to succeed for the jobset to succeed.
from kubernetes.stream import stream

control_pod = self._fetch_pod()
stream(
client.CoreV1Api().connect_get_namespaced_pod_exec,
name=control_pod["metadata"]["name"],
namespace=control_pod["metadata"]["namespace"],
command=[
"/bin/sh",
"-c",
"/sbin/killall5",
],
stderr=True,
stdin=False,
stdout=True,
tty=False,
)
except Exception as e:
with client.ApiClient() as api_client:
# If we are unable to kill the control pod then
# Delete the jobset to kill the subsequent pods.
# There are a few reasons for deleting a jobset to kill it :
# 1. Jobset has a `suspend` attribute to suspend it's execution, but this
# doesn't play nicely when jobsets are deployed with other components like kueue.
# 2. Jobset doesn't play nicely when we mutate status
# 3. Deletion is a gaurenteed way of removing any pods.
api_instance = client.CustomObjectsApi(api_client)
try:
api_instance.delete_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=self._namespace,
plural=plural,
name=self._name,
)
except Exception as e:
raise KubernetesJobsetException(
"Exception when deleting existing jobset: %s\n" % e
)

@property
def id(self):
Expand Down

0 comments on commit d8c7cc5

Please sign in to comment.