-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make forwardmodelrunner async #9198
base: main
Are you sure you want to change the base?
Conversation
098b7d4
to
784237d
Compare
3be3a96
to
4de22d3
Compare
@@ -71,26 +79,26 @@ def _setup_logging(directory: str = "logs"): | |||
JOBS_JSON_RETRY_TIME = 30 | |||
|
|||
|
|||
def _wait_for_retry(): | |||
time.sleep(JOBS_JSON_RETRY_TIME) | |||
async def _wait_for_retry(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if we need this helper function at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we need it for one of the tests. test_job_dispatch.py::test_retry_of_jobs_json_file_read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, this usage of that function is a bit strange though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We mock it to lock.acquire
in a test, so that it will stop here
src/_ert/forward_model_runner/cli.py
Outdated
message_queue: asyncio.Queue[Message], | ||
done: asyncio.Event, | ||
): | ||
while not done.is_set() or not message_queue.empty(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not message_queue.empty()
looks dangerous. I think that this get handled in here:
job_status = await asyncio.wait_for(message_queue.get(), timeout=2)
or ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted it to process all the events in the queue before exiting, but I can rewrite it to be clearer ✏️
This should help the forward model runner shutting down more gracefully, and removing some of the errors we are seeing in the logs.
nonlocal reporters, forward_model_runner_task | ||
forward_model_runner_task.cancel() | ||
for reporter in reporters: | ||
reporter.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try:
await reporter
except asyncio.CancelledError:
pass
or maybe just asyncio.gather(*reporters, return ....)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The signal handler has to be synced, but we await the task anyways so it should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To shutdown gracefully, this is what chatgpt suggests:
def setup_signal_handlers(loop):
"""
Setup signal handlers for graceful shutdown.
"""
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop, signal=sig)))
wherein shutdown is an async function.
await self._dump_event(fm_checksum) | ||
|
||
def cancel(self) -> None: | ||
self._event_publishing_task.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this suppose to be a "blocking" operation then we should await to task to finish afterwards; ie.
self._event_publishing_task.cancel()
try:
await self._event_publishing_task
except asyncio.CancelledError:
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also has to be synced as it is used in the eventloop's signal_handler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, I need to look deeper in the loop signal_handler 👍
url=self._evaluator_url, | ||
token=self._token, | ||
cert=self._cert, | ||
) as client: | ||
event = None | ||
while True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was combined with the timeout_timestamp
Issue
Resolves #9041
Approach
We will get a lot of those errors from the forward model runner (compute cluster) when we forcefully signal kill, and dont let it shut down gracefully (a lot of those errors are due to websocket connection being suddenly dropped, and not closed). Making it async would allow us to early return from the runner's run_method by calling Task.cancel on it, and we could do some cleanup on asyncio.CancellationError.
(Screenshot of new behavior in GUI if applicable)
git rebase -i main --exec 'pytest tests/ert/unit_tests -n logical -m "not integration_test"'
)When applicable