diff --git a/src/ert/job_queue/job_queue_node.py b/src/ert/job_queue/job_queue_node.py index 497e6c277bf..f2a87a07ae2 100644 --- a/src/ert/job_queue/job_queue_node.py +++ b/src/ert/job_queue/job_queue_node.py @@ -3,6 +3,7 @@ import logging import random import time +from pathlib import Path from threading import Lock, Semaphore, Thread from typing import TYPE_CHECKING, Callable, Optional @@ -25,7 +26,7 @@ if TYPE_CHECKING: from ..run_arg import RunArg - from .driver import Driver + from .queue import Driver logger = logging.getLogger(__name__) @@ -58,27 +59,7 @@ def __call__( ) -class JobQueueNode(BaseCClass): # type: ignore - TYPE_NAME = "job_queue_node" - - _alloc = ResPrototype( - "void* job_queue_node_alloc(char*," - "char*," - "char*," - "int, " - "char*," - "char*" - ")", - bind=False, - ) - _free = ResPrototype("void job_queue_node_free(job_queue_node)") - _get_status = ResPrototype( - "job_status_type_enum job_queue_node_get_status(job_queue_node)" - ) - _set_queue_status = ResPrototype( - "void job_queue_node_set_status(job_queue_node, job_status_type_enum)" - ) - +class JobQueueNode: def __init__( self, job_script: str, @@ -101,23 +82,21 @@ def __init__( self._start_time: Optional[float] = None self._end_time: Optional[float] = None self._timed_out = False + self._status: JobStatus = JobStatus.UNKNOWN self._status_msg = "" - c_ptr = self._alloc( - run_arg.job_name, - run_arg.runpath, - job_script, - num_cpu, - status_file, - exit_file, - ) - - if c_ptr is not None: - super().__init__(c_ptr) - else: - raise ValueError("Unable to create job node object") - def free(self) -> None: - self._free() + # c-struct attributes + self._status_file: str = status_file + self._exit_file: str = exit_file + self._run_cmd: str = job_script + self._job_name: str = run_arg.job_name + self._run_path: str = run_arg.runpath + self._num_cpu: int = num_cpu + self._queue_index: int = 0 + self._submit_attempt: int = 0 + self._confirmed_running: bool = False + self._fail_message: Optional[str] = None + self._error_message: Optional[str] = None def __str__(self) -> str: return ( @@ -128,7 +107,7 @@ def __str__(self) -> str: @property def run_path(self) -> str: - return self.run_arg.runpath + return self._run_path @property def timed_out(self) -> bool: @@ -137,24 +116,44 @@ def timed_out(self) -> bool: @property def submit_attempt(self) -> int: - return _get_submit_attempt(self) - - def _poll_queue_status(self, driver: "Driver") -> JobStatus: - result, msg = _refresh_status(self, driver) - if msg is not None: - self._status_msg = msg - return JobStatus(result) + # return _get_submit_attempt(self) + return self._submit_attempt + + def refresh_status(self, driver: "Driver") -> JobStatus: + if self.queue_status == JobStatus.RUNNING and not self._confirmed_running: + self._confirmed_running = Path(self._status_file).exists() + if not self._confirmed_running: + MAX_CONFIRMED_WAIT = 10 * 60 + if self.runtime > MAX_CONFIRMED_WAIT: + logger.error( + f"max_confirm_wait {MAX_CONFIRMED_WAIT} has passed since sim_start" + f"without success; {self._job_name} is assumed dead (attempt {self._submit_attempt})" + ) + self.queue_status = JobStatus.DO_KILL_NODE_FAILURE + if self.is_running(): + try: + self._status = driver.get_status(self) + except Exception: + self.queue_status = JobStatus.STATUS_FAILURE + if self.queue_status == JobStatus.EXIT: + with open(self._exit_file, "r") as exit_file: + # needs to be parsed properly + self._fail_message = exit_file.readlines()[0] + if self._fail_message and not self._error_message: + self._error_message = self._fail_message + return self.queue_status @property def queue_status(self) -> JobStatus: - return self._get_status() + # return self._get_status() + return self._status @queue_status.setter def queue_status(self, value: JobStatus) -> None: - return self._set_queue_status(value) + self._status = value def submit(self, driver: "Driver") -> SubmitStatus: - return SubmitStatus(_submit(self, driver)) + return driver.submit(self) def run_done_callback(self) -> Optional[LoadStatus]: callback_status, status_msg = forward_model_ok(self.run_arg) @@ -208,7 +207,7 @@ def _job_monitor( end_status = self._poll_until_done(driver) self._handle_end_status(driver, pool_sema, end_status, max_submit) - def _poll_until_done(self, driver: Driver) -> JobStatus: + def _poll_until_done(self, driver: "Driver") -> JobStatus: current_status = self._poll_queue_status(driver) backoff = _BackoffFunction() # in the following loop, we increase the sleep time between loop iterations as @@ -268,7 +267,7 @@ def _log_kill_thread_stopping_status(self) -> None: def _handle_end_status( self, - driver: Driver, + driver: "Driver", pool_sema: Semaphore, end_status: JobStatus, max_submit: int, diff --git a/src/ert/job_queue/queue.py b/src/ert/job_queue/queue.py index d2557fff1a3..87473a89bd5 100644 --- a/src/ert/job_queue/queue.py +++ b/src/ert/job_queue/queue.py @@ -38,6 +38,7 @@ from ert.job_queue.thread_status import ThreadStatus from . import ResPrototype +from .submit_status import SubmitStatus if TYPE_CHECKING: from ert.ensemble_evaluator import Realization @@ -86,10 +87,14 @@ def _queue_state_event_type(state: str) -> str: class Driver: - pass + def get_status(self, node: JobQueueNode) -> JobStatus: + return JobStatus.RUNNING + def submit(self, node: JobQueueNode) -> SubmitStatus: + return SubmitStatus.OK -class JobQueue: # type: ignore + +class JobQueue: # TYPE_NAME = "job_queue" # _alloc = ResPrototype("void* job_queue_alloc(void*)", bind=False) # _free = ResPrototype("void job_queue_free( job_queue )")