Skip to content

Commit

Permalink
[resotoworker][feat] Allow defining a startup idle timeout (#1823)
Browse files Browse the repository at this point in the history
* [resotoworker][feat] Allow defining a startup idle timeout

* Better help text
  • Loading branch information
lloesche authored Nov 13, 2023
1 parent 85f23ec commit b35aea0
Showing 1 changed file with 30 additions and 2 deletions.
32 changes: 30 additions & 2 deletions resotoworker/resotoworker/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
add_event_listener,
Event,
EventType,
dispatch_event,
)
from resotolib.jwt import add_args as jwt_add_args
from resotolib.logger import log, setup_logger, add_args as logging_add_args
Expand All @@ -42,6 +43,7 @@
# This will be used in main() and shutdown()
shutdown_event = threading.Event()
collect_event = threading.Event()
exit_code = 0


def main() -> None:
Expand Down Expand Up @@ -205,6 +207,7 @@ def send_request(request: requests.Request) -> requests.Response:
except Exception as e:
log.exception(f"Caught unhandled persistent Plugin exception {e}")

wait_for_idle_timeout(ArgumentParser.args.idle_timeout)
# We wait for the shutdown Event to be set() and then end the program
# While doing so we print the list of active threads once per 15 minutes
shutdown_event.wait()
Expand All @@ -213,7 +216,7 @@ def send_request(request: requests.Request) -> requests.Response:
mp_manager.shutdown()
resotolib.proc.kill_children(SIGTERM, ensure_death=True)
log.info("Shutdown complete")
os._exit(0)
os._exit(exit_code)


def core_actions_processor(
Expand All @@ -238,6 +241,7 @@ def core_actions_processor(
if kind == "action":
try:
if message_type == "collect":
collect_event.set()
start_time = time.time()
collector.collect_and_send(collectors, task_data=data)
run_time = int(time.time() - start_time)
Expand Down Expand Up @@ -301,7 +305,25 @@ def force_shutdown(delay: int = 10) -> None:
time.sleep(delay)
log_stats()
log.error(("Some child process or thread timed out during shutdown" " - forcing shutdown completion"))
os._exit(0)
os._exit(exit_code)


def wait_for_idle_timeout(idle_timeout: Optional[int]) -> None:
def check_idle_timeout() -> None:
global exit_code
if not collect_event.is_set():
log.warning("Idle timeout reached - shutting down")
exit_code = 1
dispatch_event(
Event(EventType.SHUTDOWN, {"reason": "idle timeout reached", "emergency": False}),
blocking=False,
)

if idle_timeout is None or idle_timeout <= 0:
return

log.debug(f"Running idle timeout check in {idle_timeout} seconds")
threading.Timer(idle_timeout, check_idle_timeout).start()


def add_args(arg_parser: ArgumentParser) -> None:
Expand All @@ -312,6 +334,12 @@ def add_args(arg_parser: ArgumentParser) -> None:
dest="subscriber_id",
type=str,
)
arg_parser.add_argument(
"--idle-timeout",
help="Time limit in seconds to wait for a collect task after startup (default: no limit)",
dest="idle_timeout",
type=int,
)


class WorkerWebApp(WebApp):
Expand Down

0 comments on commit b35aea0

Please sign in to comment.