Skip to content

Commit

Permalink
feat(dagster-k8s): add job debug info - resolves #19381 (#19409)
Browse files Browse the repository at this point in the history
## Summary & Motivation
We use dagster in kubernetes. For every item to run the run laucher
creates a new kubernetes job. In some cases the corresponding pod cannot
be scheduled due to restrictions enforced by policies or due to
insufficent resources. This informations can be found in the kubernetes
events but not in the dagster ui.
The dagster daemon only watches the pod to be started and reports only
pod debug information. If no pod could be found, there are no further
information provided.

It would be very helpful if the job events would be published if the pod
could not be found.

Resolves #19381 

## How I Tested These Changes
Run Dagster in Kubernetes. Then start a job/asset/... with the launchpad
and request more resources then there is quota left in for the
namespace.

`dagster-k8s/config` : `{"container_config": {"resources": {"requests":
{"cpu": "1000m", "memory": "32Gi"}, "limits": {"cpu": "1000m", "memory":
"64Gi"}}}}
`

---------

Co-authored-by: Jens Blawatt <[email protected]>
  • Loading branch information
jblawatt and Jens Blawatt authored Jan 31, 2024
1 parent d164bdb commit 1cf9e98
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 3 deletions.
42 changes: 42 additions & 0 deletions python_modules/libraries/dagster-k8s/dagster_k8s/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,48 @@ def _has_container_logs(self, container_status):

return False

def _get_job_status_str(self, job):
if not job.status:
return "Could not determine job status."

job_status = (
"Job status:"
+ f"\n - start_time: {job.status.start_time.isoformat()}"
+ f"\n - active={job.status.active or 'None'}"
+ f"\n - succeeded={job.status.succeeded or 'None'}"
+ f"\n - failed={job.status.failed or 'None'}"
)

return job_status

def get_job_debug_info(
self,
job_name: str,
namespace: str,
) -> str:
jobs = self.batch_api.list_namespaced_job(
namespace=namespace, field_selector=f"metadata.name={job_name}"
).items
job = jobs[0] if jobs else None

job_status_str = self._get_job_status_str(job) if job else f"Could not find job {job_name}"

event_strs = []

if job:
events = self.core_api.list_namespaced_event(
namespace=namespace,
field_selector=f"involvedObject.name={job_name}",
).items
for event in events:
event_strs.append(f"{event.reason}: {event.message}")

return (
f"Debug information for job {job_name}:"
+ f"\n\n{job_status_str}"
+ "".join(["\n\n" + event_str for event_str in event_strs])
)

def get_pod_debug_info(
self,
pod_name,
Expand Down
9 changes: 7 additions & 2 deletions python_modules/libraries/dagster-k8s/dagster_k8s/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
env_secrets=None,
env_vars=None,
k8s_client_batch_api=None,
k8s_client_core_api=None,
volume_mounts=None,
volumes=None,
labels=None,
Expand Down Expand Up @@ -86,7 +87,8 @@ def __init__(
kubernetes.config.load_kube_config(kubeconfig_file)

self._api_client = DagsterKubernetesClient.production_client(
batch_api_override=k8s_client_batch_api
core_api_override=k8s_client_core_api,
batch_api_override=k8s_client_batch_api,
)

self._job_config = None
Expand Down Expand Up @@ -400,9 +402,12 @@ def get_run_worker_debug_info(
)

else:
job_debug_info = self._api_client.get_job_debug_info(job_name, namespace=namespace)
full_msg = (
full_msg
+ "\nFor more information about the failure, try running `kubectl describe job"
+ "\n\n"
+ job_debug_info
+ "\n\nFor more information about the failure, try running `kubectl describe job"
f" {job_name}` in your cluster."
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from datetime import datetime
from unittest import mock

import pytest
Expand All @@ -17,9 +18,22 @@
from dagster._utils.hosted_user_process import external_job_from_recon_job
from dagster._utils.merger import merge_dicts
from dagster_k8s import K8sRunLauncher
from dagster_k8s.job import DAGSTER_PG_PASSWORD_ENV_VAR, UserDefinedDagsterK8sConfig
from dagster_k8s.job import (
DAGSTER_PG_PASSWORD_ENV_VAR,
UserDefinedDagsterK8sConfig,
get_job_name_from_run_id,
)
from kubernetes import __version__ as kubernetes_version
from kubernetes.client.models.v1_job import V1Job
from kubernetes.client.models.v1_job_status import V1JobStatus
from kubernetes.client.models.v1_object_meta import V1ObjectMeta

if kubernetes_version >= "13":
from kubernetes.client.models.core_v1_event import CoreV1Event
else:
# Ignore type error here due to differen module structures in
# older kubernetes library versions.
from kubernetes.client.models.v1_event import V1Event as CoreV1Event # type: ignore


def test_launcher_from_config(kubeconfig_file):
Expand Down Expand Up @@ -690,3 +704,92 @@ def test_check_run_health(kubeconfig_file):

health = k8s_run_launcher.check_run_worker_health(finished_run)
assert health.status == WorkerStatus.FAILED, health.msg


def test_get_run_worker_debug_info(kubeconfig_file):
labels = {"foo_label_key": "bar_label_value"}

mock_k8s_client_batch_api = mock.Mock(
spec_set=["read_namespaced_job_status", "list_namespaced_job"]
)
mock_k8s_client_core_api = mock.Mock(spec_set=["list_namespaced_pod", "list_namespaced_event"])

k8s_run_launcher = K8sRunLauncher(
service_account_name="webserver-admin",
instance_config_map="dagster-instance",
postgres_password_secret="dagster-postgresql-secret",
dagster_home="/opt/dagster/dagster_home",
job_image="fake_job_image",
load_incluster_config=False,
kubeconfig_file=kubeconfig_file,
k8s_client_batch_api=mock_k8s_client_batch_api,
k8s_client_core_api=mock_k8s_client_core_api,
labels=labels,
)

# Launch the run in a fake Dagster instance.
job_name = "demo_job"

# Create fake external job.
recon_job = reconstructable(fake_job)
recon_repo = recon_job.repository
repo_def = recon_repo.get_definition()
loadable_target_origin = LoadableTargetOrigin(python_file=__file__)

list_namespaced_pod_response = mock.Mock(spec_set=["items"])
list_namespaced_pod_response.items = []
mock_k8s_client_core_api.list_namespaced_pod.return_value = list_namespaced_pod_response

list_namespaced_job_response = mock.Mock(spec_set=["items"])
list_namespaced_job_response.items = [
V1Job(
metadata=V1ObjectMeta(name="hello-world"),
status=V1JobStatus(
failed=None,
succeeded=None,
active=None,
start_time=datetime.now(),
),
),
]
mock_k8s_client_batch_api.list_namespaced_job.return_value = list_namespaced_job_response

list_namespaced_job_event_response = mock.Mock(spec_set=["items"])
list_namespaced_job_event_response.items = [
CoreV1Event(
metadata=V1ObjectMeta(name="event/demo_job"),
reason="Testing",
message="test message",
involved_object=job_name,
)
]
mock_k8s_client_core_api.list_namespaced_event.return_value = list_namespaced_job_event_response

with instance_for_test() as instance:
k8s_run_launcher.register_instance(instance)

with in_process_test_workspace(instance, loadable_target_origin) as workspace:
location = workspace.get_code_location(workspace.code_location_names[0])
repo_handle = RepositoryHandle(
repository_name=repo_def.name,
code_location=location,
)
fake_external_job = external_job_from_recon_job(
recon_job,
op_selection=None,
repository_handle=repo_handle,
)

started_run = create_run_for_test(
instance,
job_name=job_name,
external_job_origin=fake_external_job.get_external_origin(),
job_code_origin=fake_external_job.get_python_origin(),
status=DagsterRunStatus.STARTING,
)

debug_info = k8s_run_launcher.get_run_worker_debug_info(started_run)
running_job_name = get_job_name_from_run_id(started_run.run_id)
assert f"Debug information for job {running_job_name}" in debug_info
assert "Job status:" in debug_info
assert "Testing: test message" in debug_info

0 comments on commit 1cf9e98

Please sign in to comment.