Skip to content

Commit

Permalink
Expose job_queue as property in simulation_context
Browse files Browse the repository at this point in the history
  • Loading branch information
xjules committed Nov 21, 2023
1 parent 01d351f commit c3dea7b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 20 deletions.
30 changes: 16 additions & 14 deletions src/ert/simulator/simulation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def __init__(

# Wait until the queue is active before we finish the creation
# to ensure sane job status while running
while self.isRunning() and not self._job_queue.is_active():
while self.isRunning() and not self.job_queue.is_active():
sleep(0.1)

def get_run_args(self, iens: int) -> "RunArg":
Expand All @@ -139,7 +139,7 @@ def get_run_args(self, iens: int) -> "RunArg":
def _run_simulations_simple_step(self) -> Thread:
sim_thread = Thread(
target=lambda: _run_forward_model(
self._ert, self._job_queue, self._run_context
self._ert, self.job_queue, self._run_context
)
)
sim_thread.start()
Expand All @@ -150,15 +150,13 @@ def __len__(self) -> int:

def isRunning(self) -> bool:
# TODO: Should separate between running jobs and having loaded all data
return self._sim_thread.is_alive() or self._job_queue.is_active()
return self._sim_thread.is_alive() or self.job_queue.is_active()

def didRealizationSucceed(self, iens: int) -> bool:
queue_index = self.get_run_args(iens).queue_index
if queue_index is None:
raise ValueError("Queue index not set")
return (
self._job_queue.realization_state(queue_index) == RealizationState.SUCCESS
)
return self.job_queue.realization_state(queue_index) == RealizationState.SUCCESS

def didRealizationFail(self, iens: int) -> bool:
# For the purposes of this class, a failure should be anything (killed
Expand All @@ -171,19 +169,23 @@ def isRealizationFinished(self, iens: int) -> bool:
queue_index = run_arg.queue_index
if queue_index is not None:
return not (
self._job_queue.realization_state(queue_index)
self.job_queue.realization_state(queue_index)
in [RealizationState.SUCCESS, RealizationState.WAITING]
)
else:
# job was not submitted
return False

@property
def job_queue(self) -> JobQueue:
return self._job_queue

def __repr__(self) -> str:
running = "running" if self.isRunning() else "not running"
numRunn = self._job_queue.count_status(RealizationState.RUNNING)
numSucc = self._job_queue.count_status(RealizationState.SUCCESS)
numFail = self._job_queue.count_status(RealizationState.FAILED)
numWait = self._job_queue.count_status(RealizationState.WAITING)
numRunn = self.job_queue.count_status(RealizationState.RUNNING)
numSucc = self.job_queue.count_status(RealizationState.SUCCESS)
numFail = self.job_queue.count_status(RealizationState.FAILED)
numWait = self.job_queue.count_status(RealizationState.WAITING)
return (
f"SimulationContext({running}, #running = {numRunn}, "
f"#success = {numSucc}, #failed = {numFail}, #waiting = {numWait})"
Expand All @@ -193,7 +195,7 @@ def get_sim_fs(self) -> EnsembleAccessor:
return self._run_context.sim_fs

def stop(self) -> None:
self._job_queue.kill_all_jobs()
self.job_queue.kill_all_jobs()
self._sim_thread.join()

def job_progress(self, iens: int) -> Optional[ForwardModelStatus]:
Expand Down Expand Up @@ -224,7 +226,7 @@ def job_progress(self, iens: int) -> Optional[ForwardModelStatus]:
if queue_index is None:
# job was not submitted
return None
if self._job_queue.realization_state(queue_index) == RealizationState.WAITING:
if self.job_queue.realization_state(queue_index) == RealizationState.WAITING:
return None

return ForwardModelStatus.load(run_arg.runpath)
Expand All @@ -242,4 +244,4 @@ def job_status(self, iens: int) -> Optional[RealizationState]:
if queue_index is None:
# job was not submitted
return None
return self._job_queue.realization_state(queue_index)
return self.job_queue.realization_state(queue_index)
12 changes: 6 additions & 6 deletions tests/unit_tests/simulator/test_simulation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ def test_simulation_context(setup_case, storage):
f"runpath/realization-{iens}-{iens}/iter-0"
)

assert even_ctx.getNumFailed() == 0
assert even_ctx.getNumRunning() == 0
assert even_ctx.getNumSuccess() == size / 2
assert even_ctx.job_queue.count_status(RealizationState.FAILED) == 0
assert even_ctx.job_queue.count_status(RealizationState.RUNNING) == 0
assert even_ctx.job_queue.count_status(RealizationState.SUCCESS) == size / 2

assert odd_ctx.getNumFailed() == 0
assert odd_ctx.getNumRunning() == 0
assert odd_ctx.getNumSuccess() == size / 2
assert odd_ctx.job_queue.count_status(RealizationState.FAILED) == 0
assert odd_ctx.job_queue.count_status(RealizationState.RUNNING) == 0
assert odd_ctx.job_queue.count_status(RealizationState.SUCCESS) == size / 2

for iens in range(size):
if iens % 2 == 0:
Expand Down

0 comments on commit c3dea7b

Please sign in to comment.