From b35aea03b58469f24f30d5ba7592c73ec8a2f8b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20L=C3=B6sche?= Date: Mon, 13 Nov 2023 22:08:09 +0100 Subject: [PATCH] [resotoworker][feat] Allow defining a startup idle timeout (#1823) * [resotoworker][feat] Allow defining a startup idle timeout * Better help text --- resotoworker/resotoworker/__main__.py | 32 +++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/resotoworker/resotoworker/__main__.py b/resotoworker/resotoworker/__main__.py index 848ff261b4..6ac583fd85 100644 --- a/resotoworker/resotoworker/__main__.py +++ b/resotoworker/resotoworker/__main__.py @@ -22,6 +22,7 @@ add_event_listener, Event, EventType, + dispatch_event, ) from resotolib.jwt import add_args as jwt_add_args from resotolib.logger import log, setup_logger, add_args as logging_add_args @@ -42,6 +43,7 @@ # This will be used in main() and shutdown() shutdown_event = threading.Event() collect_event = threading.Event() +exit_code = 0 def main() -> None: @@ -205,6 +207,7 @@ def send_request(request: requests.Request) -> requests.Response: except Exception as e: log.exception(f"Caught unhandled persistent Plugin exception {e}") + wait_for_idle_timeout(ArgumentParser.args.idle_timeout) # We wait for the shutdown Event to be set() and then end the program # While doing so we print the list of active threads once per 15 minutes shutdown_event.wait() @@ -213,7 +216,7 @@ def send_request(request: requests.Request) -> requests.Response: mp_manager.shutdown() resotolib.proc.kill_children(SIGTERM, ensure_death=True) log.info("Shutdown complete") - os._exit(0) + os._exit(exit_code) def core_actions_processor( @@ -238,6 +241,7 @@ def core_actions_processor( if kind == "action": try: if message_type == "collect": + collect_event.set() start_time = time.time() collector.collect_and_send(collectors, task_data=data) run_time = int(time.time() - start_time) @@ -301,7 +305,25 @@ def force_shutdown(delay: int = 10) -> None: time.sleep(delay) log_stats() log.error(("Some child process or thread timed out during shutdown" " - forcing shutdown completion")) - os._exit(0) + os._exit(exit_code) + + +def wait_for_idle_timeout(idle_timeout: Optional[int]) -> None: + def check_idle_timeout() -> None: + global exit_code + if not collect_event.is_set(): + log.warning("Idle timeout reached - shutting down") + exit_code = 1 + dispatch_event( + Event(EventType.SHUTDOWN, {"reason": "idle timeout reached", "emergency": False}), + blocking=False, + ) + + if idle_timeout is None or idle_timeout <= 0: + return + + log.debug(f"Running idle timeout check in {idle_timeout} seconds") + threading.Timer(idle_timeout, check_idle_timeout).start() def add_args(arg_parser: ArgumentParser) -> None: @@ -312,6 +334,12 @@ def add_args(arg_parser: ArgumentParser) -> None: dest="subscriber_id", type=str, ) + arg_parser.add_argument( + "--idle-timeout", + help="Time limit in seconds to wait for a collect task after startup (default: no limit)", + dest="idle_timeout", + type=int, + ) class WorkerWebApp(WebApp):