Skip to content

Commit

Permalink
WIP: job_queue_node
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Nov 8, 2023
1 parent 58d1f5d commit 9f8159c
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 37 deletions.
105 changes: 70 additions & 35 deletions src/ert/job_queue/job_queue_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import random
import time
from pathlib import Path
from threading import Lock, Semaphore, Thread
from typing import TYPE_CHECKING, Callable, Optional

Expand Down Expand Up @@ -58,26 +59,26 @@ def __call__(
)


class JobQueueNode(BaseCClass): # type: ignore
TYPE_NAME = "job_queue_node"

_alloc = ResPrototype(
"void* job_queue_node_alloc(char*,"
"char*,"
"char*,"
"int, "
"char*,"
"char*"
")",
bind=False,
)
_free = ResPrototype("void job_queue_node_free(job_queue_node)")
_get_status = ResPrototype(
"job_status_type_enum job_queue_node_get_status(job_queue_node)"
)
_set_queue_status = ResPrototype(
"void job_queue_node_set_status(job_queue_node, job_status_type_enum)"
)
class JobQueueNode:
# TYPE_NAME = "job_queue_node"

# _alloc = ResPrototype(
# "void* job_queue_node_alloc(char*,"
# "char*,"
# "char*,"
# "int, "
# "char*,"
# "char*"
# ")",
# bind=False,
# )
# _free = ResPrototype("void job_queue_node_free(job_queue_node)")
# _get_status = ResPrototype(
# "job_status_type_enum job_queue_node_get_status(job_queue_node)"
# )
# _set_queue_status = ResPrototype(
# "void job_queue_node_set_status(job_queue_node, job_status_type_enum)"
# )

def __init__(
self,
Expand All @@ -101,23 +102,36 @@ def __init__(
self._start_time: Optional[float] = None
self._end_time: Optional[float] = None
self._timed_out = False
self._status: Optional[JobStatus] = None
self._status_msg = ""
c_ptr = self._alloc(
run_arg.job_name,
run_arg.runpath,
job_script,
num_cpu,
status_file,
exit_file,
)

if c_ptr is not None:
super().__init__(c_ptr)
else:
raise ValueError("Unable to create job node object")
# c-struct attributes
self._status_file: str = status_file
self._exit_file: str = exit_file
self._run_cmd: str = job_script
self._job_name: str = run_arg.job_name
self._run_path: str = run_arg.runpath
self._num_cpu: int = num_cpu
self._queue_index: int = 0
self._submit_attempt: int = 0
self._confirmed_running: bool = False
# c_ptr = self._alloc(
# run_arg.job_name,
# run_arg.runpath,
# job_script,
# num_cpu,
# status_file,
# exit_file,
# )

# if c_ptr is not None:
# super().__init__(c_ptr)
# else:
# raise ValueError("Unable to create job node object")

def free(self) -> None:
self._free()
# self._free()
pass

def __str__(self) -> str:
return (
Expand All @@ -137,7 +151,27 @@ def timed_out(self) -> bool:

@property
def submit_attempt(self) -> int:
return _get_submit_attempt(self)
# return _get_submit_attempt(self)
return self._submit_attempt

def refresh_status(self, driver: Driver):
if self.queue_status == JobStatus.RUNNING and not self._confirmed_running:
self._confirmed_running = Path(self._status_file).exists()
if not self._confirmed_running:
MAX_CONFIRMED_WAIT = 10 * 60
if (time.time() - self._start_time) > MAX_CONFIRMED_WAIT:
logger.error(
f"max_confirm_wait {MAX_CONFIRMED_WAIT} has passed since sim_start"
f"without success; {self._job_name} is assumed dead (attempt {self._submit_attempt})"
)
self._status = JobStatus.DO_KILL_NODE_FAILURE
if self.queue_status in [
JobStatus.RUNNING,
JobStatus.PENDING,
JobStatus.SUBMITTED,
JobStatus.UNKNOWN,
]:
self._status = driver.get_status(self)

def _poll_queue_status(self, driver: "Driver") -> JobStatus:
result, msg = _refresh_status(self, driver)
Expand All @@ -147,7 +181,8 @@ def _poll_queue_status(self, driver: "Driver") -> JobStatus:

@property
def queue_status(self) -> JobStatus:
return self._get_status()
# return self._get_status()
return self._status

@queue_status.setter
def queue_status(self, value: JobStatus) -> None:
Expand Down
5 changes: 3 additions & 2 deletions src/ert/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ def _queue_state_event_type(state: str) -> str:


class Driver:
pass
def get_status(self, node: JobQueueNode) -> JobStatus:
return JobStatus.RUNNING


class JobQueue: # type: ignore
class JobQueue:
# TYPE_NAME = "job_queue"
# _alloc = ResPrototype("void* job_queue_alloc(void*)", bind=False)
# _free = ResPrototype("void job_queue_free( job_queue )")
Expand Down

0 comments on commit 9f8159c

Please sign in to comment.