Skip to content

Commit

Permalink
WIP: job_queue_node
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Nov 8, 2023
1 parent 58d1f5d commit 72929d0
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 52 deletions.
99 changes: 49 additions & 50 deletions src/ert/job_queue/job_queue_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import random
import time
from pathlib import Path
from threading import Lock, Semaphore, Thread
from typing import TYPE_CHECKING, Callable, Optional

Expand All @@ -25,7 +26,7 @@

if TYPE_CHECKING:
from ..run_arg import RunArg
from .driver import Driver
from .queue import Driver

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -58,27 +59,7 @@ def __call__(
)


class JobQueueNode(BaseCClass): # type: ignore
TYPE_NAME = "job_queue_node"

_alloc = ResPrototype(
"void* job_queue_node_alloc(char*,"
"char*,"
"char*,"
"int, "
"char*,"
"char*"
")",
bind=False,
)
_free = ResPrototype("void job_queue_node_free(job_queue_node)")
_get_status = ResPrototype(
"job_status_type_enum job_queue_node_get_status(job_queue_node)"
)
_set_queue_status = ResPrototype(
"void job_queue_node_set_status(job_queue_node, job_status_type_enum)"
)

class JobQueueNode:
def __init__(
self,
job_script: str,
Expand All @@ -101,23 +82,21 @@ def __init__(
self._start_time: Optional[float] = None
self._end_time: Optional[float] = None
self._timed_out = False
self._status: JobStatus = JobStatus.UNKNOWN
self._status_msg = ""
c_ptr = self._alloc(
run_arg.job_name,
run_arg.runpath,
job_script,
num_cpu,
status_file,
exit_file,
)

if c_ptr is not None:
super().__init__(c_ptr)
else:
raise ValueError("Unable to create job node object")

def free(self) -> None:
self._free()
# c-struct attributes
self._status_file: str = status_file
self._exit_file: str = exit_file
self._run_cmd: str = job_script
self._job_name: str = run_arg.job_name
self._run_path: str = run_arg.runpath
self._num_cpu: int = num_cpu
self._queue_index: int = 0
self._submit_attempt: int = 0
self._confirmed_running: bool = False
self._fail_message: Optional[str] = None
self._error_message: Optional[str] = None

def __str__(self) -> str:
return (
Expand All @@ -128,7 +107,7 @@ def __str__(self) -> str:

@property
def run_path(self) -> str:
return self.run_arg.runpath
return self._run_path

@property
def timed_out(self) -> bool:
Expand All @@ -137,24 +116,44 @@ def timed_out(self) -> bool:

@property
def submit_attempt(self) -> int:
return _get_submit_attempt(self)

def _poll_queue_status(self, driver: "Driver") -> JobStatus:
result, msg = _refresh_status(self, driver)
if msg is not None:
self._status_msg = msg
return JobStatus(result)
# return _get_submit_attempt(self)
return self._submit_attempt

def refresh_status(self, driver: "Driver") -> JobStatus:
if self.queue_status == JobStatus.RUNNING and not self._confirmed_running:
self._confirmed_running = Path(self._status_file).exists()
if not self._confirmed_running:
MAX_CONFIRMED_WAIT = 10 * 60
if self.runtime > MAX_CONFIRMED_WAIT:
logger.error(
f"max_confirm_wait {MAX_CONFIRMED_WAIT} has passed since sim_start"
f"without success; {self._job_name} is assumed dead (attempt {self._submit_attempt})"
)
self.queue_status = JobStatus.DO_KILL_NODE_FAILURE
if self.is_running():
try:
self._status = driver.get_status(self)
except Exception:
self.queue_status = JobStatus.STATUS_FAILURE
if self.queue_status == JobStatus.EXIT:
with open(self._exit_file, "r") as exit_file:
# needs to be parsed properly
self._fail_message = exit_file.readlines()[0]
if self._fail_message and not self._error_message:
self._error_message = self._fail_message
return self.queue_status

@property
def queue_status(self) -> JobStatus:
return self._get_status()
# return self._get_status()
return self._status

@queue_status.setter
def queue_status(self, value: JobStatus) -> None:
return self._set_queue_status(value)
self._status = value

def submit(self, driver: "Driver") -> SubmitStatus:
return SubmitStatus(_submit(self, driver))
return driver.submit(self)

def run_done_callback(self) -> Optional[LoadStatus]:
callback_status, status_msg = forward_model_ok(self.run_arg)
Expand Down Expand Up @@ -208,7 +207,7 @@ def _job_monitor(
end_status = self._poll_until_done(driver)
self._handle_end_status(driver, pool_sema, end_status, max_submit)

def _poll_until_done(self, driver: Driver) -> JobStatus:
def _poll_until_done(self, driver: "Driver") -> JobStatus:
current_status = self._poll_queue_status(driver)
backoff = _BackoffFunction()
# in the following loop, we increase the sleep time between loop iterations as
Expand Down Expand Up @@ -268,7 +267,7 @@ def _log_kill_thread_stopping_status(self) -> None:

def _handle_end_status(
self,
driver: Driver,
driver: "Driver",
pool_sema: Semaphore,
end_status: JobStatus,
max_submit: int,
Expand Down
9 changes: 7 additions & 2 deletions src/ert/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from ert.job_queue.thread_status import ThreadStatus

from . import ResPrototype
from .submit_status import SubmitStatus

if TYPE_CHECKING:
from ert.ensemble_evaluator import Realization
Expand Down Expand Up @@ -86,10 +87,14 @@ def _queue_state_event_type(state: str) -> str:


class Driver:
pass
def get_status(self, node: JobQueueNode) -> JobStatus:
return JobStatus.RUNNING

def submit(self, node: JobQueueNode) -> SubmitStatus:
return SubmitStatus.OK

class JobQueue: # type: ignore

class JobQueue:
# TYPE_NAME = "job_queue"
# _alloc = ResPrototype("void* job_queue_alloc(void*)", bind=False)
# _free = ResPrototype("void job_queue_free( job_queue )")
Expand Down

0 comments on commit 72929d0

Please sign in to comment.