Skip to content

Commit

Permalink
Fix callsite in k8s run launcher termination where we were making an …
Browse files Browse the repository at this point in the history
…unneeded and potentially expensive event log fetch (#26288)

Summary:
This callsite was correctly gated in the other places it is called, but
we missed the one in terminate. Consolidate calsites so that anything
that might need to fetch this information checks whether the
experimental run resume feature is enabled or not.

[dagster-k8s] Fixed an issue where run termination sometimes timed out
when terminating runs with many events.

## Summary & Motivation

## How I Tested These Changes

## Changelog

> Insert changelog entry or delete this section.
  • Loading branch information
gibsondan authored Dec 5, 2024
1 parent 1a2a067 commit c60f5d4
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions python_modules/libraries/dagster-k8s/dagster_k8s/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ def resume_run(self, context: ResumeRunContext) -> None:

self._launch_k8s_job_with_args(job_name, args, run)

def _get_resume_attempt_number(self, run: DagsterRun) -> Optional[int]:
if not self.supports_run_worker_crash_recovery:
return None
return self._instance.count_resume_run_attempts(run.run_id)

def terminate(self, run_id):
check.str_param(run_id, "run_id")
run = self._instance.get_run_by_id(run_id)
Expand All @@ -325,7 +330,7 @@ def terminate(self, run_id):
container_context = self.get_container_context_for_run(run)

job_name = get_job_name_from_run_id(
run_id, resume_attempt_number=self._instance.count_resume_run_attempts(run.run_id)
run_id, resume_attempt_number=self._get_resume_attempt_number(run)
)

try:
Expand Down Expand Up @@ -367,12 +372,10 @@ def get_run_worker_debug_info(
self, run: DagsterRun, include_container_logs: Optional[bool] = True
) -> Optional[str]:
container_context = self.get_container_context_for_run(run)
if self.supports_run_worker_crash_recovery:
resume_attempt_number = self._instance.count_resume_run_attempts(run.run_id)
else:
resume_attempt_number = None

job_name = get_job_name_from_run_id(run.run_id, resume_attempt_number=resume_attempt_number)
job_name = get_job_name_from_run_id(
run.run_id, resume_attempt_number=self._get_resume_attempt_number(run)
)
namespace = container_context.namespace
pod_names = self._api_client.get_pod_names_in_job(job_name, namespace=namespace)
full_msg = ""
Expand Down Expand Up @@ -411,12 +414,9 @@ def get_run_worker_debug_info(
def check_run_worker_health(self, run: DagsterRun):
container_context = self.get_container_context_for_run(run)

if self.supports_run_worker_crash_recovery:
resume_attempt_number = self._instance.count_resume_run_attempts(run.run_id)
else:
resume_attempt_number = None

job_name = get_job_name_from_run_id(run.run_id, resume_attempt_number=resume_attempt_number)
job_name = get_job_name_from_run_id(
run.run_id, resume_attempt_number=self._get_resume_attempt_number(run)
)
try:
status = self._api_client.get_job_status(
namespace=container_context.namespace,
Expand Down

0 comments on commit c60f5d4

Please sign in to comment.