From 5b5ecf5c1550da94ca52e873fb1a484cd913dac5 Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Fri, 3 May 2024 16:18:10 +0200 Subject: [PATCH 01/21] ensure_agents improvements: first draft --- src/inmanta/agent/agent.py | 30 ++-- src/inmanta/data/__init__.py | 2 +- src/inmanta/protocol/endpoints.py | 5 +- src/inmanta/server/agentmanager.py | 224 +++++++++++++++-------------- 4 files changed, 142 insertions(+), 119 deletions(-) diff --git a/src/inmanta/agent/agent.py b/src/inmanta/agent/agent.py index 6c9ef95f9b..9373025df7 100644 --- a/src/inmanta/agent/agent.py +++ b/src/inmanta/agent/agent.py @@ -406,6 +406,7 @@ def reload( gid = uuid.uuid4() self.logger.info("Running %s for reason: %s", gid, self.running.reason) + # TODO: how does this deal with undeployable? Are they correctly marked as done? # re-generate generation self.generation = {} @@ -666,6 +667,7 @@ def periodic_schedule( ) ) ) + # TODO: what happens when agent unpauses? Nothing, except if deploy interval is set self.ensure_deploy_on_start = False periodic_schedule("deploy", deploy_action, self._deploy_interval, self._deploy_splay_value, now) periodic_schedule("repair", repair_action, self._repair_interval, self._repair_splay_value, now) @@ -945,18 +947,23 @@ async def _init_agent_map(self) -> None: self.agent_map = cfg.agent_map.get() async def _init_endpoint_names(self) -> None: - if self.hostname is not None: - await self.add_end_point_name(self.hostname) - else: - # load agent names from the config file - agent_names = cfg.agent_names.get() - if agent_names is not None: - for name in agent_names: - if "$" in name: - name = name.replace("$node-name", self.node_name) - await self.add_end_point_name(name) + assert self.agent_map is not None + endpoints: Iterable[str] = ( + [self.hostname] + if self.hostname is not None + else self.agent_map.keys() + if cfg.use_autostart_agent_map.get() + else ( + name if "$" not in name else name.replace("$node-name", self.node_name) + for name in cfg.agent_names.get() + ) + ) + for endpoint in endpoints: + await self.add_end_point_name(endpoint) async def stop(self) -> None: + # TODO: does this remove active session from agent manager? + # I don't think so. Only closes the transport await super().stop() self.executor_manager.stop() @@ -1037,6 +1044,7 @@ async def update_agent_map(self, agent_map: dict[str, str]) -> None: await self._update_agent_map(agent_map) async def _update_agent_map(self, agent_map: dict[str, str]) -> None: + # TODO: call validate method? i.e. inmanta.data.convert_agent_map async with self._instances_lock: self.agent_map = agent_map # Add missing agents @@ -1057,6 +1065,7 @@ async def _update_agent_map(self, agent_map: dict[str, str]) -> None: agent_name for agent_name in update_uri_agents if self._instances[agent_name].is_enabled() ] + # TODO: doesn't this only set ensure_deploy_on_start for new agents? to_be_gathered = [self._add_end_point_name(agent_name, ensure_deploy_on_start=True) for agent_name in agents_to_add] to_be_gathered += [self._remove_end_point_name(agent_name) for agent_name in agents_to_remove + update_uri_agents] await asyncio.gather(*to_be_gathered) @@ -1105,6 +1114,7 @@ async def on_reconnect(self) -> None: if result.code == 200 and result.result is not None: state = result.result if "enabled" in state and isinstance(state["enabled"], bool): + # TODO: when agent starts, this unpauses agent instances await self.set_state(name, state["enabled"]) else: LOGGER.warning("Server reported invalid state %s" % (repr(state))) diff --git a/src/inmanta/data/__init__.py b/src/inmanta/data/__init__.py index 11e5d3f252..f5680c05cc 100644 --- a/src/inmanta/data/__init__.py +++ b/src/inmanta/data/__init__.py @@ -1219,7 +1219,7 @@ def get_connection( """ if connection is not None: return util.nullcontext(connection) - # Make pypi happy + # Make mypy happy assert cls._connection_pool is not None return cls._connection_pool.acquire() diff --git a/src/inmanta/protocol/endpoints.py b/src/inmanta/protocol/endpoints.py index 95aa4c2b70..e0aa095d3c 100644 --- a/src/inmanta/protocol/endpoints.py +++ b/src/inmanta/protocol/endpoints.py @@ -170,14 +170,15 @@ class SessionEndpoint(Endpoint, CallTarget): _client: "SessionClient" _heartbeat_client: "SessionClient" - def __init__(self, name: str, timeout: int = 120, reconnect_delay: int = 5): + def __init__(self, name: str, timeout: int = 120, reconnect_delay: int = 5, *, session_id: Optional[uuid.UUID]): + # TODO: docstring session_id super().__init__(name) self._transport = client.RESTClient self._sched = util.Scheduler("session endpoint") self._env_id: Optional[uuid.UUID] = None - self.sessionid: uuid.UUID = uuid.uuid1() + self.sessionid: uuid.UUID = session_id if session_id is not None else uuid.uuid4() self.running: bool = True self.server_timeout = timeout self.reconnect_delay = reconnect_delay diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index a4cb2ac5ea..7b27a491a1 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -17,6 +17,7 @@ """ import asyncio +import contextlib import logging import os import shutil @@ -264,6 +265,7 @@ async def _pause_agent( live_session = self.tid_endpoint_to_session.get(key) if live_session: # The agent has an active agent instance that has to be paused + # TODO: race condition? Agent.pause executes in transaction, new _seen_session could re-add it? del self.tid_endpoint_to_session[key] await live_session.get_client().set_state(agent_name, enabled=False) endpoints_with_new_primary.append((agent_name, None)) @@ -634,6 +636,7 @@ def get_agent_client(self, tid: uuid.UUID, endpoint: str, live_agent_only: bool else: return None + # TODO: is this method still used? Same for the methods it calls async def are_agents_active(self, tid: uuid.UUID, endpoints: list[str]) -> bool: """ Return true iff all the given agents are in the up or the paused state. @@ -645,6 +648,7 @@ async def get_agent_active_status(self, tid: uuid.UUID, endpoints: list[str]) -> Return a list of tuples where the first element of the tuple contains the name of an endpoint and the second a boolean indicating where there is an active (up or paused) agent for that endpoint. """ + # TODO: this only checks that agent has a session, not that it is enabled (`on_reconnect` has finished) all_sids_for_env = [sid for (sid, session) in self.sessions.items() if session.tid == tid] all_active_endpoints_for_env = {ep for sid in all_sids_for_env for ep in self.endpoints_for_sid[sid]} return [(ep, ep in all_active_endpoints_for_env) for ep in endpoints] @@ -971,13 +975,21 @@ async def restart_agents(self, env: data.Environment) -> None: LOGGER.debug("Restarting agents in environment %s", env.id) agents = await data.Agent.get_list(environment=env.id) agent_list = [a.name for a in agents] - await self._ensure_agents(env, agent_list, True) + await self._ensure_agents(env, agent_list, restart=True) - async def stop_agents(self, env: data.Environment, delete_venv: bool = False) -> None: + async def stop_agents( + self, + env: data.Environment, + *, + delete_venv: bool = False, + agent_lock: bool = True, + ) -> None: """ Stop all agents for this environment and close sessions + + :param agent_lock: Whether to acquire the agent lock. If false, caller must acquire it. """ - async with self.agent_lock: + async with (self.agent_lock if agent_lock else contextlib.nullcontext()): LOGGER.debug("Stopping all autostarted agents for env %s", env.id) if env.id in self._agent_procs: subproc = self._agent_procs[env.id] @@ -1028,9 +1040,9 @@ async def _terminate_agents(self) -> None: async def _ensure_agents( self, env: data.Environment, - agents: Sequence[str], - restart: bool = False, + agents: Collection[str], *, + restart: bool = False, connection: Optional[asyncpg.connection.Connection] = None, ) -> bool: """ @@ -1043,106 +1055,35 @@ async def _ensure_agents( if self._stopping: raise ShutdownInProgress() - agent_map: dict[str, str] = cast( - dict[str, str], await env.get(data.AUTOSTART_AGENT_MAP, connection=connection) + agent_map: Mapping[str, str] = cast( + Mapping[str, str], await env.get(data.AUTOSTART_AGENT_MAP, connection=connection) ) # we know the type of this map - agents = [agent for agent in agents if agent in agent_map] - needsstart = restart + # TODO: update agents parameter description: these are the agent instances that must be waited for + agents: Set[str] = set(agents) & agent_map.keys() if len(agents) == 0: return False - async def is_start_agent_required() -> bool: - if needsstart: - return True - return not await self._agent_manager.are_agents_active(env.id, agents) - - async with self.agent_lock: - # silently ignore requests if this environment is halted - refreshed_env: Optional[data.Environment] = await data.Environment.get_by_id(env.id, connection=connection) - if refreshed_env is None: - raise Exception("Can't ensure agent: environment %s does not exist" % env.id) - env = refreshed_env - if env.halted: - return False - - if await is_start_agent_required(): - LOGGER.info("%s matches agents managed by server, ensuring it is started.", agents) - res = await self.__do_start_agent(agents, env, connection=connection) - return res - return False - - async def __do_start_agent( - self, agents: list[str], env: data.Environment, *, connection: Optional[asyncpg.connection.Connection] = None - ) -> bool: - """ - Start an agent process for the given agents in the given environment - - Note: Always call under agent_lock - """ - agent_map: dict[str, str] - agent_map = cast(dict[str, str], await env.get(data.AUTOSTART_AGENT_MAP, connection=connection)) - config: str - config = await self._make_agent_config(env, agents, agent_map, connection=connection) - - config_dir = os.path.join(self._server_storage["agents"], str(env.id)) - if not os.path.exists(config_dir): - os.mkdir(config_dir) - - config_path = os.path.join(config_dir, "agent.cfg") - with open(config_path, "w+", encoding="utf-8") as fd: - fd.write(config) - - out: str = os.path.join(self._server_storage["logs"], "agent-%s.out" % env.id) - err: str = os.path.join(self._server_storage["logs"], "agent-%s.err" % env.id) - - agent_log = os.path.join(self._server_storage["logs"], "agent-%s.log" % env.id) - - proc: Optional[subprocess.Process] = None - try: - proc = await self._fork_inmanta( - [ - "--log-file-level", - "DEBUG", - "--timed-logs", - "--config", - config_path, - "--config-dir", - Config._config_dir if Config._config_dir is not None else "", - "--log-file", - agent_log, - "agent", - ], - out, - err, - ) - - if env.id in self._agent_procs and self._agent_procs[env.id] is not None: - # If the return code is not None the process is already terminated - if self._agent_procs[env.id].returncode is None: - LOGGER.debug("Terminating old agent with PID %s", self._agent_procs[env.id].pid) - self._agent_procs[env.id].terminate() - await self._wait_for_proc_bounded([self._agent_procs[env.id]]) - self._agent_procs[env.id] = proc - except Exception as e: - # Prevent dangling processes - if proc is not None and proc.returncode is None: - proc.kill() - raise e - + # TODO: make proper method instead of nested function? async def _wait_until_agent_instances_are_active() -> None: + # TODO: check docstring """ Wait until all AgentInstances for the endpoints `agents` are active. A TimeoutError is raised when not all AgentInstances are active and no new AgentInstance became active in the last 5 seconds. """ - agent_statuses: dict[str, Optional[AgentStatus]] = await data.Agent.get_statuses(env.id, set(agents)) + agent_statuses: dict[str, Optional[AgentStatus]] = await data.Agent.get_statuses(env.id, agents) # Only wait for agents that are not paused - expected_agents_in_up_state: set[str] = { + expected_agents_in_up_state: Set[str] = { agent_name for agent_name, status in agent_statuses.items() if status is not None and status is not AgentStatus.paused } + + # TODO: docstring: only call under agent lock and when process has been started + assert env.id in self._procs + proc = self._agent_procs[env.id] + actual_agents_in_up_state: set[str] = set() started = int(time.time()) last_new_agent_seen = started @@ -1152,6 +1093,11 @@ async def _wait_until_agent_instances_are_active() -> None: await asyncio.sleep(0.1) now = int(time.time()) if now - last_new_agent_seen > AUTO_STARTED_AGENT_WAIT: + LOGGER.warning( + "Timeout: agent with PID %s took too long to start: still waiting for agent instances %s", + proc.pid, + ",".join(sorted(expected_agents_in_up_state - actual_agents_in_up_state)) + ) raise asyncio.TimeoutError() if now - last_log > AUTO_STARTED_AGENT_WAIT_LOG_INTERVAL: last_log = now @@ -1172,20 +1118,94 @@ async def _wait_until_agent_instances_are_active() -> None: last_new_agent_seen = now actual_agents_in_up_state = new_actual_agents_in_up_state + LOGGER.debug( + "Agent process with PID %s is up for agent instances %s", + proc.pid, + ",".join(sorted(expected_agents_in_up_state)), + ) + + async with self.agent_lock: + # silently ignore requests if this environment is halted + refreshed_env: Optional[data.Environment] = await data.Environment.get_by_id(env.id, connection=connection) + if refreshed_env is None: + raise Exception("Can't ensure agent: environment %s does not exist" % env.id) + env = refreshed_env + if env.halted: + return False + + if env.id not in self._agent_procs or self._agent_procs[env.id].return_code is None: + # Start new process if none is currently running for this environment. + # Otherwise trust that it tracks any changes to the agent map. + LOGGER.info("%s matches agents managed by server, ensuring it is started.", agents) + self._agent_procs[env.id] = await __do_start_agent(env, connection=connection) + elif restart: + LOGGER.info( + "%s matches agents managed by server, forcing restart: stopping process with PID %s.", + agents, + self._agent_procs[env.id], + ) + # TODO: this is by far the simplest solution, but does it suffice? What if a non-autostarted agent is running as well? + # Or what if new process fails? + await self.stop_agents(env, agent_lock=False) + self._agent_procs[env.id] = await __do_start_agent(env, connection=connection) + + # Wait for all agents to start + try: + await _wait_until_agent_instances_are_active() + except asyncio.TimeoutError: + # TODO: what to do here? Should TimeoutError even be raised? + pass + # TODO: return values -> what does the bool mean? + return False + + async def __do_start_agent( + self, env: data.Environment, *, connection: Optional[asyncpg.connection.Connection] = None + ) -> subprocess.Process: + # TODO: docstring + # TODO: docstring: agent lock no longer strictly required + """ + Start an agent process for the given agents in the given environment + + Note: Always call under agent_lock + """ + config: str = await self._make_agent_config(env, connection=connection) + + config_dir = os.path.join(self._server_storage["agents"], str(env.id)) + if not os.path.exists(config_dir): + os.mkdir(config_dir) + + config_path = os.path.join(config_dir, "agent.cfg") + with open(config_path, "w+", encoding="utf-8") as fd: + fd.write(config) + + out: str = os.path.join(self._server_storage["logs"], "agent-%s.out" % env.id) + err: str = os.path.join(self._server_storage["logs"], "agent-%s.err" % env.id) + + agent_log = os.path.join(self._server_storage["logs"], "agent-%s.log" % env.id) + + proc: subprocess.Process = await self._fork_inmanta( + [ + "--log-file-level", + "DEBUG", + "--timed-logs", + "--config", + config_path, + "--config-dir", + Config._config_dir if Config._config_dir is not None else "", + "--log-file", + agent_log, + "agent", + ], + out, + err, + ) + LOGGER.debug("Started new agent with PID %s", proc.pid) - # Wait for all agents to start - try: - await _wait_until_agent_instances_are_active() - LOGGER.debug("Agent with PID %s is up", proc.pid) - except asyncio.TimeoutError: - LOGGER.warning("Timeout: agent with PID %s took too long to start", proc.pid) - return True + return proc async def _make_agent_config( self, env: data.Environment, - agent_names: list[str], - agent_map: dict[str, str], *, connection: Optional[asyncpg.connection.Connection], ) -> str: @@ -1193,8 +1213,6 @@ async def _make_agent_config( Generate the config file for the process that hosts the autostarted agents :param env: The environment for which to autostart agents - :param agent_names: The names of the agents - :param agent_map: The agent mapping to use :return: A string that contains the config file content. """ environment_id = str(env.id) @@ -1208,16 +1226,11 @@ async def _make_agent_config( agent_repair_splay: int = cast(int, await env.get(data.AUTOSTART_AGENT_REPAIR_SPLAY_TIME, connection=connection)) agent_repair_interval: str = cast(str, await env.get(data.AUTOSTART_AGENT_REPAIR_INTERVAL, connection=connection)) - # The internal agent always needs to have a session. Otherwise the agentmap update trigger doesn't work - if "internal" not in agent_names: - agent_names.append("internal") - # generate config file config = """[config] state-dir=%(statedir)s use_autostart_agent_map=true -agent-names = %(agents)s environment=%(env_id)s agent-deploy-splay-time=%(agent_deploy_splay)d @@ -1231,7 +1244,6 @@ async def _make_agent_config( port=%(port)s host=%(serveradress)s """ % { - "agents": ",".join(agent_names), "env_id": environment_id, "port": port, "statedir": privatestatedir, From 6c492ef721d27b7bf995da72e3335b2b251b7255 Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Fri, 3 May 2024 16:38:46 +0200 Subject: [PATCH 02/21] fixes --- src/inmanta/protocol/endpoints.py | 4 ++-- src/inmanta/server/agentmanager.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/inmanta/protocol/endpoints.py b/src/inmanta/protocol/endpoints.py index e0aa095d3c..472836a228 100644 --- a/src/inmanta/protocol/endpoints.py +++ b/src/inmanta/protocol/endpoints.py @@ -170,7 +170,7 @@ class SessionEndpoint(Endpoint, CallTarget): _client: "SessionClient" _heartbeat_client: "SessionClient" - def __init__(self, name: str, timeout: int = 120, reconnect_delay: int = 5, *, session_id: Optional[uuid.UUID]): + def __init__(self, name: str, timeout: int = 120, reconnect_delay: int = 5): # TODO: docstring session_id super().__init__(name) self._transport = client.RESTClient @@ -178,7 +178,7 @@ def __init__(self, name: str, timeout: int = 120, reconnect_delay: int = 5, *, s self._env_id: Optional[uuid.UUID] = None - self.sessionid: uuid.UUID = session_id if session_id is not None else uuid.uuid4() + self.sessionid: uuid.UUID = uuid.uuid1() self.running: bool = True self.server_timeout = timeout self.reconnect_delay = reconnect_delay diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 7b27a491a1..4c5057a1a4 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -25,7 +25,7 @@ import time import uuid from asyncio import queues, subprocess -from collections.abc import Iterable, Sequence +from collections.abc import Collection, Iterable, Mapping, Sequence from datetime import datetime from enum import Enum from typing import Any, Optional, Union, cast From 8685e78794ed10008fb89812b1d055846b6919a2 Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Fri, 3 May 2024 16:54:10 +0200 Subject: [PATCH 03/21] fixes --- src/inmanta/server/agentmanager.py | 8 ++++---- src/inmanta/server/services/environmentservice.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 4c5057a1a4..edfec6cf42 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -1081,7 +1081,7 @@ async def _wait_until_agent_instances_are_active() -> None: } # TODO: docstring: only call under agent lock and when process has been started - assert env.id in self._procs + assert env.id in self._agent_procs proc = self._agent_procs[env.id] actual_agents_in_up_state: set[str] = set() @@ -1133,11 +1133,11 @@ async def _wait_until_agent_instances_are_active() -> None: if env.halted: return False - if env.id not in self._agent_procs or self._agent_procs[env.id].return_code is None: + if env.id not in self._agent_procs or self._agent_procs[env.id].returncode is None: # Start new process if none is currently running for this environment. # Otherwise trust that it tracks any changes to the agent map. LOGGER.info("%s matches agents managed by server, ensuring it is started.", agents) - self._agent_procs[env.id] = await __do_start_agent(env, connection=connection) + self._agent_procs[env.id] = await self.__do_start_agent(env, connection=connection) elif restart: LOGGER.info( "%s matches agents managed by server, forcing restart: stopping process with PID %s.", @@ -1147,7 +1147,7 @@ async def _wait_until_agent_instances_are_active() -> None: # TODO: this is by far the simplest solution, but does it suffice? What if a non-autostarted agent is running as well? # Or what if new process fails? await self.stop_agents(env, agent_lock=False) - self._agent_procs[env.id] = await __do_start_agent(env, connection=connection) + self._agent_procs[env.id] = await self.__do_start_agent(env, connection=connection) # Wait for all agents to start try: diff --git a/src/inmanta/server/services/environmentservice.py b/src/inmanta/server/services/environmentservice.py index 4c76a2dac5..9577522b33 100644 --- a/src/inmanta/server/services/environmentservice.py +++ b/src/inmanta/server/services/environmentservice.py @@ -290,7 +290,7 @@ async def _halt( await refreshed_env.update_fields(halted=True, connection=con) await self.agent_manager.halt_agents(refreshed_env, connection=con) - await self.autostarted_agent_manager.stop_agents(refreshed_env, delete_agent_venv) + await self.autostarted_agent_manager.stop_agents(refreshed_env, delete_venv=delete_agent_venv) @handle(methods_v2.resume_environment, env="tid") async def resume(self, env: data.Environment) -> None: From c517c68f41e795ae08e90564a4718aafe00a89fa Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Tue, 7 May 2024 16:55:30 +0200 Subject: [PATCH 04/21] refinements --- src/inmanta/agent/agent.py | 7 ---- src/inmanta/protocol/endpoints.py | 1 - src/inmanta/server/agentmanager.py | 64 ++++++++++++++++++++++-------- tests/test_agent_manager.py | 1 + 4 files changed, 49 insertions(+), 24 deletions(-) diff --git a/src/inmanta/agent/agent.py b/src/inmanta/agent/agent.py index 9373025df7..6c9cefeaef 100644 --- a/src/inmanta/agent/agent.py +++ b/src/inmanta/agent/agent.py @@ -406,7 +406,6 @@ def reload( gid = uuid.uuid4() self.logger.info("Running %s for reason: %s", gid, self.running.reason) - # TODO: how does this deal with undeployable? Are they correctly marked as done? # re-generate generation self.generation = {} @@ -667,7 +666,6 @@ def periodic_schedule( ) ) ) - # TODO: what happens when agent unpauses? Nothing, except if deploy interval is set self.ensure_deploy_on_start = False periodic_schedule("deploy", deploy_action, self._deploy_interval, self._deploy_splay_value, now) periodic_schedule("repair", repair_action, self._repair_interval, self._repair_splay_value, now) @@ -962,8 +960,6 @@ async def _init_endpoint_names(self) -> None: await self.add_end_point_name(endpoint) async def stop(self) -> None: - # TODO: does this remove active session from agent manager? - # I don't think so. Only closes the transport await super().stop() self.executor_manager.stop() @@ -1065,7 +1061,6 @@ async def _update_agent_map(self, agent_map: dict[str, str]) -> None: agent_name for agent_name in update_uri_agents if self._instances[agent_name].is_enabled() ] - # TODO: doesn't this only set ensure_deploy_on_start for new agents? to_be_gathered = [self._add_end_point_name(agent_name, ensure_deploy_on_start=True) for agent_name in agents_to_add] to_be_gathered += [self._remove_end_point_name(agent_name) for agent_name in agents_to_remove + update_uri_agents] await asyncio.gather(*to_be_gathered) @@ -1114,7 +1109,6 @@ async def on_reconnect(self) -> None: if result.code == 200 and result.result is not None: state = result.result if "enabled" in state and isinstance(state["enabled"], bool): - # TODO: when agent starts, this unpauses agent instances await self.set_state(name, state["enabled"]) else: LOGGER.warning("Server reported invalid state %s" % (repr(state))) @@ -1183,7 +1177,6 @@ async def get_code( resource_install_specs.append(resource_install_spec) # Update the ``_previously_loaded`` cache to indicate that the given resource type's ResourceInstallSpec # was constructed successfully at the specified version. - # TODO: this cache is a slight memory leak self._previously_loaded[(resource_type, version)] = resource_install_spec else: invalid_resource_types.add(resource_type) diff --git a/src/inmanta/protocol/endpoints.py b/src/inmanta/protocol/endpoints.py index 472836a228..95aa4c2b70 100644 --- a/src/inmanta/protocol/endpoints.py +++ b/src/inmanta/protocol/endpoints.py @@ -171,7 +171,6 @@ class SessionEndpoint(Endpoint, CallTarget): _heartbeat_client: "SessionClient" def __init__(self, name: str, timeout: int = 120, reconnect_delay: int = 5): - # TODO: docstring session_id super().__init__(name) self._transport = client.RESTClient self._sched = util.Scheduler("session endpoint") diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index edfec6cf42..34620e6071 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -17,7 +17,6 @@ """ import asyncio -import contextlib import logging import os import shutil @@ -653,6 +652,18 @@ async def get_agent_active_status(self, tid: uuid.UUID, endpoints: list[str]) -> all_active_endpoints_for_env = {ep for sid in all_sids_for_env for ep in self.endpoints_for_sid[sid]} return [(ep, ep in all_active_endpoints_for_env) for ep in endpoints] + async def expire_sessions_for_agents(self, env_id: uuid.UUID, endpoints: Set[str]) -> None: + """ + Expire all sessions for any of the requested agent endpoints. + """ + async with self.session_lock: + sessions_to_expire: Iterator[UUID] = ( + session + for session in self.sessions.values() + if endpoints & session.endpoint_names and session.tid == env_id + ) + await asyncio.gather(*(s.expire_and_abort(timeout=0) for s in sessions_to_expire)) + async def expire_all_sessions_for_environment(self, env_id: uuid.UUID) -> None: async with self.session_lock: await asyncio.gather(*[s.expire_and_abort(timeout=0) for s in self.sessions.values() if s.tid == env_id]) @@ -982,14 +993,11 @@ async def stop_agents( env: data.Environment, *, delete_venv: bool = False, - agent_lock: bool = True, ) -> None: """ Stop all agents for this environment and close sessions - - :param agent_lock: Whether to acquire the agent lock. If false, caller must acquire it. """ - async with (self.agent_lock if agent_lock else contextlib.nullcontext()): + async with self.agent_lock: LOGGER.debug("Stopping all autostarted agents for env %s", env.id) if env.id in self._agent_procs: subproc = self._agent_procs[env.id] @@ -1002,6 +1010,31 @@ async def stop_agents( LOGGER.debug("Expiring all sessions for %s", env.id) await self._agent_manager.expire_all_sessions_for_environment(env.id) + async def _stop_autostarted_agents( + self, + env: data.Environment, + ) -> None: + """ + Stop the autostarted agent process for this environment and expire all its sessions. + Does not expire non-autostarted agents' sessions. + + Must be called under the agent lock + """ + LOGGER.debug("Stopping all autostarted agents for env %s", env.id) + if env.id in self._agent_procs: + subproc = self._agent_procs[env.id] + self._stop_process(subproc) + await self._wait_for_proc_bounded([subproc]) + del self._agent_procs[env.id] + + # fetch the agent map after stopping the process to prevent races with agent map update notifying the process + agent_map: Mapping[str, str] = cast( + Mapping[str, str], await env.get(data.AUTOSTART_AGENT_MAP, connection=connection) + ) # we know the type of this map + + LOGGER.debug("Expiring sessions for autostarted agents %s", sorted(agent_map.keys())) + await self._agent_manager.expire_sessions_for_agents(env.id, agent_map.keys()) + def _get_state_dir_for_agent_in_env(self, env_id: uuid.UUID) -> str: """ Return the state dir to be used by the auto-started agent in the given environment. @@ -1049,7 +1082,8 @@ async def _ensure_agents( Ensure that all agents defined in the current environment (model) and that should be autostarted, are started. :param env: The environment to start the agents for - :param agents: A list of agent names that possibly should be started in this environment. + :param agents: A list of agent names that should be running in this environment. Waits for the agents that are both in + this list and in the agent map to be active before returning. :param restart: Restart all agents even if the list of agents is up to date. """ if self._stopping: @@ -1059,7 +1093,6 @@ async def _ensure_agents( Mapping[str, str], await env.get(data.AUTOSTART_AGENT_MAP, connection=connection) ) # we know the type of this map - # TODO: update agents parameter description: these are the agent instances that must be waited for agents: Set[str] = set(agents) & agent_map.keys() if len(agents) == 0: return False @@ -1111,7 +1144,11 @@ async def _wait_until_agent_instances_are_active() -> None: new_actual_agents_in_up_state = { agent_name for agent_name in expected_agents_in_up_state - if (env.id, agent_name) in self._agent_manager.tid_endpoint_to_session + if ( + (session := self._agent_manager.tid_endpoint_to_session.get((env.id, agent_name), None)) is not None + # make sure to check for expiry because sessions are unregistered from the agent manager asyncronously + and not session.expired + ) } if len(new_actual_agents_in_up_state) > len(actual_agents_in_up_state): # Reset timeout timer because a new instance became active @@ -1144,9 +1181,8 @@ async def _wait_until_agent_instances_are_active() -> None: agents, self._agent_procs[env.id], ) - # TODO: this is by far the simplest solution, but does it suffice? What if a non-autostarted agent is running as well? - # Or what if new process fails? - await self.stop_agents(env, agent_lock=False) + if self._agent_manager + await self._stop_autostarted_agents(env) self._agent_procs[env.id] = await self.__do_start_agent(env, connection=connection) # Wait for all agents to start @@ -1161,12 +1197,8 @@ async def _wait_until_agent_instances_are_active() -> None: async def __do_start_agent( self, env: data.Environment, *, connection: Optional[asyncpg.connection.Connection] = None ) -> subprocess.Process: - # TODO: docstring - # TODO: docstring: agent lock no longer strictly required """ - Start an agent process for the given agents in the given environment - - Note: Always call under agent_lock + Start an autostarted agent process for the given environment. Should only be called if none is running yet. """ config: str = await self._make_agent_config(env, connection=connection) diff --git a/tests/test_agent_manager.py b/tests/test_agent_manager.py index a87f7fe6e3..8199b202f2 100644 --- a/tests/test_agent_manager.py +++ b/tests/test_agent_manager.py @@ -1207,6 +1207,7 @@ async def _dummy_fork_inmanta( assert exception_message in str(excinfo.value) +# TODO: rework test + drop are_agents_active method async def test_are_agents_active(server, client, environment, agent_factory) -> None: """ Ensure that the `AgentManager.are_agents_active()` method returns True when an agent From 0179dac5bf92670cb27ea5c3de0cc036107b5cee Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Wed, 8 May 2024 09:32:39 +0200 Subject: [PATCH 05/21] fix --- src/inmanta/server/agentmanager.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 34620e6071..8865c18905 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -24,7 +24,7 @@ import time import uuid from asyncio import queues, subprocess -from collections.abc import Collection, Iterable, Mapping, Sequence +from collections.abc import Collection, Iterable, Mapping, Sequence, Set from datetime import datetime from enum import Enum from typing import Any, Optional, Union, cast @@ -1181,7 +1181,6 @@ async def _wait_until_agent_instances_are_active() -> None: agents, self._agent_procs[env.id], ) - if self._agent_manager await self._stop_autostarted_agents(env) self._agent_procs[env.id] = await self.__do_start_agent(env, connection=connection) From d71473c9bdea0332348a13453d849087f5510239 Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Wed, 8 May 2024 10:01:26 +0200 Subject: [PATCH 06/21] wip --- src/inmanta/data/__init__.py | 4 +- src/inmanta/server/agentmanager.py | 143 +++++++++++++++-------------- tests/test_agent_manager.py | 32 +------ 3 files changed, 80 insertions(+), 99 deletions(-) diff --git a/src/inmanta/data/__init__.py b/src/inmanta/data/__init__.py index f5680c05cc..c15c5a44a3 100644 --- a/src/inmanta/data/__init__.py +++ b/src/inmanta/data/__init__.py @@ -29,7 +29,7 @@ import warnings from abc import ABC, abstractmethod from collections import abc, defaultdict -from collections.abc import Awaitable, Callable, Iterable, Sequence +from collections.abc import Awaitable, Callable, Iterable, Sequence, Set from configparser import RawConfigParser from contextlib import AbstractAsyncContextManager from itertools import chain @@ -3344,7 +3344,7 @@ def get_valid_field_names(cls) -> list[str]: return super().get_valid_field_names() + ["process_name", "status"] @classmethod - async def get_statuses(cls, env_id: uuid.UUID, agent_names: set[str]) -> dict[str, Optional[AgentStatus]]: + async def get_statuses(cls, env_id: uuid.UUID, agent_names: Set[str]) -> dict[str, Optional[AgentStatus]]: result: dict[str, Optional[AgentStatus]] = {} for agent_name in agent_names: agent = await cls.get_one(environment=env_id, name=agent_name) diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 8865c18905..401559a6ff 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -1097,70 +1097,6 @@ async def _ensure_agents( if len(agents) == 0: return False - # TODO: make proper method instead of nested function? - async def _wait_until_agent_instances_are_active() -> None: - # TODO: check docstring - """ - Wait until all AgentInstances for the endpoints `agents` are active. - A TimeoutError is raised when not all AgentInstances are active and no new AgentInstance - became active in the last 5 seconds. - """ - agent_statuses: dict[str, Optional[AgentStatus]] = await data.Agent.get_statuses(env.id, agents) - # Only wait for agents that are not paused - expected_agents_in_up_state: Set[str] = { - agent_name - for agent_name, status in agent_statuses.items() - if status is not None and status is not AgentStatus.paused - } - - # TODO: docstring: only call under agent lock and when process has been started - assert env.id in self._agent_procs - proc = self._agent_procs[env.id] - - actual_agents_in_up_state: set[str] = set() - started = int(time.time()) - last_new_agent_seen = started - last_log = started - - while len(expected_agents_in_up_state) != len(actual_agents_in_up_state): - await asyncio.sleep(0.1) - now = int(time.time()) - if now - last_new_agent_seen > AUTO_STARTED_AGENT_WAIT: - LOGGER.warning( - "Timeout: agent with PID %s took too long to start: still waiting for agent instances %s", - proc.pid, - ",".join(sorted(expected_agents_in_up_state - actual_agents_in_up_state)) - ) - raise asyncio.TimeoutError() - if now - last_log > AUTO_STARTED_AGENT_WAIT_LOG_INTERVAL: - last_log = now - LOGGER.debug( - "Waiting for agent with PID %s, waited %d seconds, %d/%d instances up", - proc.pid, - now - started, - len(actual_agents_in_up_state), - len(expected_agents_in_up_state), - ) - new_actual_agents_in_up_state = { - agent_name - for agent_name in expected_agents_in_up_state - if ( - (session := self._agent_manager.tid_endpoint_to_session.get((env.id, agent_name), None)) is not None - # make sure to check for expiry because sessions are unregistered from the agent manager asyncronously - and not session.expired - ) - } - if len(new_actual_agents_in_up_state) > len(actual_agents_in_up_state): - # Reset timeout timer because a new instance became active - last_new_agent_seen = now - actual_agents_in_up_state = new_actual_agents_in_up_state - - LOGGER.debug( - "Agent process with PID %s is up for agent instances %s", - proc.pid, - ",".join(sorted(expected_agents_in_up_state)), - ) - async with self.agent_lock: # silently ignore requests if this environment is halted refreshed_env: Optional[data.Environment] = await data.Environment.get_by_id(env.id, connection=connection) @@ -1170,7 +1106,7 @@ async def _wait_until_agent_instances_are_active() -> None: if env.halted: return False - if env.id not in self._agent_procs or self._agent_procs[env.id].returncode is None: + if env.id not in self._agent_procs or self._agent_procs[env.id].returncode is not None: # Start new process if none is currently running for this environment. # Otherwise trust that it tracks any changes to the agent map. LOGGER.info("%s matches agents managed by server, ensuring it is started.", agents) @@ -1186,10 +1122,13 @@ async def _wait_until_agent_instances_are_active() -> None: # Wait for all agents to start try: - await _wait_until_agent_instances_are_active() + await self._wait_for_agents(env, agents) except asyncio.TimeoutError: - # TODO: what to do here? Should TimeoutError even be raised? - pass + # TODO: better log message? Detailed one is already raised in wait method + # Depends on return value semantics + LOGGER.warning( + "Not all agent instances started successfully", + ) # TODO: return values -> what does the bool mean? return False @@ -1338,6 +1277,74 @@ async def _fork_inmanta( if errhandle is not None: errhandle.close() + async def _wait_for_agents(self, env: data.Environment, agents: Set[str]) -> None: + """ + Wait until all requested autostarted agent instances are active, e.g. after starting a new agent process. + + Must be called under the agent lock. + + :param env: The environment for which to wait for agents. + :param agents: Autostarted agent endpoints to wait for. + + :raises TimeoutError: When not all agent instances are active and no new agent instance became active in the last + 5 seconds. + """ + # TODO: TimeoutError? + docstring + agent_statuses: dict[str, Optional[AgentStatus]] = await data.Agent.get_statuses(env.id, agents) + # Only wait for agents that are not paused + expected_agents_in_up_state: Set[str] = { + agent_name + for agent_name, status in agent_statuses.items() + if status is not None and status is not AgentStatus.paused + } + + assert env.id in self._agent_procs + proc = self._agent_procs[env.id] + + actual_agents_in_up_state: set[str] = set() + started = int(time.time()) + last_new_agent_seen = started + last_log = started + + while len(expected_agents_in_up_state) != len(actual_agents_in_up_state): + await asyncio.sleep(0.1) + now = int(time.time()) + if now - last_new_agent_seen > AUTO_STARTED_AGENT_WAIT: + LOGGER.warning( + "Timeout: agent with PID %s took too long to start: still waiting for agent instances %s", + proc.pid, + ",".join(sorted(expected_agents_in_up_state - actual_agents_in_up_state)) + ) + raise asyncio.TimeoutError() + if now - last_log > AUTO_STARTED_AGENT_WAIT_LOG_INTERVAL: + last_log = now + LOGGER.debug( + "Waiting for agent with PID %s, waited %d seconds, %d/%d instances up", + proc.pid, + now - started, + len(actual_agents_in_up_state), + len(expected_agents_in_up_state), + ) + new_actual_agents_in_up_state = { + agent_name + for agent_name in expected_agents_in_up_state + if ( + (session := self._agent_manager.tid_endpoint_to_session.get((env.id, agent_name), None)) is not None + # make sure to check for expiry because sessions are unregistered from the agent manager asyncronously + and not session.expired + ) + } + if len(new_actual_agents_in_up_state) > len(actual_agents_in_up_state): + # Reset timeout timer because a new instance became active + last_new_agent_seen = now + actual_agents_in_up_state = new_actual_agents_in_up_state + + LOGGER.debug( + "Agent process with PID %s is up for agent instances %s", + proc.pid, + ",".join(sorted(expected_agents_in_up_state)), + ) + async def notify_agent_about_agent_map_update(self, env: data.Environment) -> None: agent_client = self._agent_manager.get_agent_client(tid=env.id, endpoint="internal", live_agent_only=False) if agent_client: diff --git a/tests/test_agent_manager.py b/tests/test_agent_manager.py index 8199b202f2..8f8e5e42a9 100644 --- a/tests/test_agent_manager.py +++ b/tests/test_agent_manager.py @@ -1207,35 +1207,6 @@ async def _dummy_fork_inmanta( assert exception_message in str(excinfo.value) -# TODO: rework test + drop are_agents_active method -async def test_are_agents_active(server, client, environment, agent_factory) -> None: - """ - Ensure that the `AgentManager.are_agents_active()` method returns True when an agent - is in the up or the paused state. - """ - agentmanager = server.get_slice(SLICE_AGENT_MANAGER) - agent_name = "agent1" - env_id = UUID(environment) - env = await data.Environment.get_by_id(env_id) - - # The agent is not started yet -> it should not be active - assert not await agentmanager.are_agents_active(tid=env_id, endpoints=[agent_name]) - - # Start agent - await agentmanager.ensure_agent_registered(env, agent_name) - await agent_factory(environment=environment, agent_map={agent_name: ""}, agent_names=[agent_name]) - - # Verify agent is active - await retry_limited(agentmanager.are_agents_active, tid=env_id, endpoints=[agent_name], timeout=10) - - # Pause agent - result = await client.agent_action(tid=env_id, name=agent_name, action=AgentAction.pause.value) - assert result.code == 200, result.result - - # Ensure the agent is still active - await retry_limited(agentmanager.are_agents_active, tid=env_id, endpoints=[agent_name], timeout=10) - - async def test_dont_start_paused_agent(server, client, environment, caplog) -> None: """ Ensure that the AutostartedAgentManager doesn't try to start an agent that is paused (inmanta/inmanta-core#4398). @@ -1270,6 +1241,9 @@ async def test_dont_start_paused_agent(server, client, environment, caplog) -> N result = await client.agent_action(tid=env_id, name=agent_name, action=AgentAction.pause.value) assert result.code == 200, result.result + # Pausing an agent should have no direct effect on the autostarted agent manager + assert len(autostarted_agent_manager._agent_procs) == 1 + # Execute _ensure_agents() again and verify that no restart is triggered caplog.clear() await autostarted_agent_manager._ensure_agents(env=env, agents=[agent_name]) From ded5e6c2560226efbc6b6591455ad7003577fdcd Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Wed, 8 May 2024 11:23:41 +0200 Subject: [PATCH 07/21] fixes --- src/inmanta/data/__init__.py | 6 +- src/inmanta/server/agentmanager.py | 90 ++++++++++++++----------- tests/agent_server/test_server_agent.py | 2 +- 3 files changed, 54 insertions(+), 44 deletions(-) diff --git a/src/inmanta/data/__init__.py b/src/inmanta/data/__init__.py index c15c5a44a3..940f047356 100644 --- a/src/inmanta/data/__init__.py +++ b/src/inmanta/data/__init__.py @@ -3344,10 +3344,12 @@ def get_valid_field_names(cls) -> list[str]: return super().get_valid_field_names() + ["process_name", "status"] @classmethod - async def get_statuses(cls, env_id: uuid.UUID, agent_names: Set[str]) -> dict[str, Optional[AgentStatus]]: + async def get_statuses( + cls, env_id: uuid.UUID, agent_names: Set[str], *, connection: Optional[asyncpg.connection.Connection] = None + ) -> dict[str, Optional[AgentStatus]]: result: dict[str, Optional[AgentStatus]] = {} for agent_name in agent_names: - agent = await cls.get_one(environment=env_id, name=agent_name) + agent = await cls.get_one(environment=env_id, name=agent_name, connection=connection) if agent: result[agent_name] = agent.get_status() else: diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 401559a6ff..0b2cf7eff4 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -1013,6 +1013,8 @@ async def stop_agents( async def _stop_autostarted_agents( self, env: data.Environment, + *, + connection: Optional[asyncpg.connection.Connection] = None, ) -> None: """ Stop the autostarted agent process for this environment and expire all its sessions. @@ -1085,52 +1087,57 @@ async def _ensure_agents( :param agents: A list of agent names that should be running in this environment. Waits for the agents that are both in this list and in the agent map to be active before returning. :param restart: Restart all agents even if the list of agents is up to date. + :param connection: The database connection to use. Must not be in a transaction context. """ if self._stopping: raise ShutdownInProgress() - agent_map: Mapping[str, str] = cast( - Mapping[str, str], await env.get(data.AUTOSTART_AGENT_MAP, connection=connection) - ) # we know the type of this map + if connection is not None and connection.is_in_transaction(): + raise Exception("_ensure_agents should not be called in a transaction context") - agents: Set[str] = set(agents) & agent_map.keys() - if len(agents) == 0: - return False + async with data.Agent.get_connection(connection) as connection: + agent_map: Mapping[str, str] = cast( + Mapping[str, str], await env.get(data.AUTOSTART_AGENT_MAP, connection=connection) + ) # we know the type of this map - async with self.agent_lock: - # silently ignore requests if this environment is halted - refreshed_env: Optional[data.Environment] = await data.Environment.get_by_id(env.id, connection=connection) - if refreshed_env is None: - raise Exception("Can't ensure agent: environment %s does not exist" % env.id) - env = refreshed_env - if env.halted: + agents: Set[str] = set(agents) & agent_map.keys() + if len(agents) == 0: return False - if env.id not in self._agent_procs or self._agent_procs[env.id].returncode is not None: - # Start new process if none is currently running for this environment. - # Otherwise trust that it tracks any changes to the agent map. - LOGGER.info("%s matches agents managed by server, ensuring it is started.", agents) - self._agent_procs[env.id] = await self.__do_start_agent(env, connection=connection) - elif restart: - LOGGER.info( - "%s matches agents managed by server, forcing restart: stopping process with PID %s.", - agents, - self._agent_procs[env.id], - ) - await self._stop_autostarted_agents(env) - self._agent_procs[env.id] = await self.__do_start_agent(env, connection=connection) - - # Wait for all agents to start - try: - await self._wait_for_agents(env, agents) - except asyncio.TimeoutError: - # TODO: better log message? Detailed one is already raised in wait method - # Depends on return value semantics - LOGGER.warning( - "Not all agent instances started successfully", - ) - # TODO: return values -> what does the bool mean? - return False + async with self.agent_lock: + # silently ignore requests if this environment is halted + refreshed_env: Optional[data.Environment] = await data.Environment.get_by_id(env.id, connection=connection) + if refreshed_env is None: + raise Exception("Can't ensure agent: environment %s does not exist" % env.id) + env = refreshed_env + if env.halted: + return False + + if env.id not in self._agent_procs or self._agent_procs[env.id].returncode is not None: + # Start new process if none is currently running for this environment. + # Otherwise trust that it tracks any changes to the agent map. + LOGGER.info("%s matches agents managed by server, ensuring it is started.", agents) + self._agent_procs[env.id] = await self.__do_start_agent(env, connection=connection) + elif restart: + LOGGER.info( + "%s matches agents managed by server, forcing restart: stopping process with PID %s.", + agents, + self._agent_procs[env.id], + ) + await self._stop_autostarted_agents(env, connection=connection) + self._agent_procs[env.id] = await self.__do_start_agent(env, connection=connection) + + # Wait for all agents to start + try: + await self._wait_for_agents(env, agents, connection=connection) + except asyncio.TimeoutError: + # TODO: better log message? Detailed one is already raised in wait method + # Depends on return value semantics + LOGGER.warning( + "Not all agent instances started successfully", + ) + # TODO: return values -> what does the bool mean? + return False async def __do_start_agent( self, env: data.Environment, *, connection: Optional[asyncpg.connection.Connection] = None @@ -1277,7 +1284,9 @@ async def _fork_inmanta( if errhandle is not None: errhandle.close() - async def _wait_for_agents(self, env: data.Environment, agents: Set[str]) -> None: + async def _wait_for_agents( + self, env: data.Environment, agents: Set[str], *, connection: Optional[asyncpg.connection.Connection] = None + ) -> None: """ Wait until all requested autostarted agent instances are active, e.g. after starting a new agent process. @@ -1289,8 +1298,7 @@ async def _wait_for_agents(self, env: data.Environment, agents: Set[str]) -> Non :raises TimeoutError: When not all agent instances are active and no new agent instance became active in the last 5 seconds. """ - # TODO: TimeoutError? + docstring - agent_statuses: dict[str, Optional[AgentStatus]] = await data.Agent.get_statuses(env.id, agents) + agent_statuses: dict[str, Optional[AgentStatus]] = await data.Agent.get_statuses(env.id, agents, connection=connection) # Only wait for agents that are not paused expected_agents_in_up_state: Set[str] = { agent_name diff --git a/tests/agent_server/test_server_agent.py b/tests/agent_server/test_server_agent.py index d68db61f59..71782fcf67 100644 --- a/tests/agent_server/test_server_agent.py +++ b/tests/agent_server/test_server_agent.py @@ -529,7 +529,7 @@ async def test_env_setting_wiring_to_autostarted_agent( autostarted_agent_manager = server.get_slice(SLICE_AUTOSTARTED_AGENT_MANAGER) config = await autostarted_agent_manager._make_agent_config( - env, agent_names=[], agent_map={"internal": ""}, connection=None + env, connection=None ) assert f"agent-deploy-interval={interval}" in config From c8c05a8c39556561f9c7ad5e585310de6bdfaf49 Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Wed, 8 May 2024 11:24:38 +0200 Subject: [PATCH 08/21] pep8 --- src/inmanta/agent/agent.py | 7 +++---- src/inmanta/server/agentmanager.py | 8 +++----- tests/agent_server/test_server_agent.py | 4 +--- 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/src/inmanta/agent/agent.py b/src/inmanta/agent/agent.py index 6c9cefeaef..8a72514bd3 100644 --- a/src/inmanta/agent/agent.py +++ b/src/inmanta/agent/agent.py @@ -949,11 +949,10 @@ async def _init_endpoint_names(self) -> None: endpoints: Iterable[str] = ( [self.hostname] if self.hostname is not None - else self.agent_map.keys() - if cfg.use_autostart_agent_map.get() else ( - name if "$" not in name else name.replace("$node-name", self.node_name) - for name in cfg.agent_names.get() + self.agent_map.keys() + if cfg.use_autostart_agent_map.get() + else (name if "$" not in name else name.replace("$node-name", self.node_name) for name in cfg.agent_names.get()) ) ) for endpoint in endpoints: diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 0b2cf7eff4..519dc8f67a 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -24,7 +24,7 @@ import time import uuid from asyncio import queues, subprocess -from collections.abc import Collection, Iterable, Mapping, Sequence, Set +from collections.abc import Collection, Iterable, Iterator, Mapping, Sequence, Set from datetime import datetime from enum import Enum from typing import Any, Optional, Union, cast @@ -658,9 +658,7 @@ async def expire_sessions_for_agents(self, env_id: uuid.UUID, endpoints: Set[str """ async with self.session_lock: sessions_to_expire: Iterator[UUID] = ( - session - for session in self.sessions.values() - if endpoints & session.endpoint_names and session.tid == env_id + session for session in self.sessions.values() if endpoints & session.endpoint_names and session.tid == env_id ) await asyncio.gather(*(s.expire_and_abort(timeout=0) for s in sessions_to_expire)) @@ -1321,7 +1319,7 @@ async def _wait_for_agents( LOGGER.warning( "Timeout: agent with PID %s took too long to start: still waiting for agent instances %s", proc.pid, - ",".join(sorted(expected_agents_in_up_state - actual_agents_in_up_state)) + ",".join(sorted(expected_agents_in_up_state - actual_agents_in_up_state)), ) raise asyncio.TimeoutError() if now - last_log > AUTO_STARTED_AGENT_WAIT_LOG_INTERVAL: diff --git a/tests/agent_server/test_server_agent.py b/tests/agent_server/test_server_agent.py index 71782fcf67..443dbe23d6 100644 --- a/tests/agent_server/test_server_agent.py +++ b/tests/agent_server/test_server_agent.py @@ -528,9 +528,7 @@ async def test_env_setting_wiring_to_autostarted_agent( env = await data.Environment.get_by_id(env_id) autostarted_agent_manager = server.get_slice(SLICE_AUTOSTARTED_AGENT_MANAGER) - config = await autostarted_agent_manager._make_agent_config( - env, connection=None - ) + config = await autostarted_agent_manager._make_agent_config(env, connection=None) assert f"agent-deploy-interval={interval}" in config assert f"agent-repair-interval={interval}" in config From 18dcf34361cabc4e3b4ff826d2402a0355ccec07 Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Wed, 8 May 2024 14:26:49 +0200 Subject: [PATCH 09/21] todos --- src/inmanta/agent/agent.py | 1 + src/inmanta/server/agentmanager.py | 17 ----------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/src/inmanta/agent/agent.py b/src/inmanta/agent/agent.py index 8a72514bd3..18f7c098de 100644 --- a/src/inmanta/agent/agent.py +++ b/src/inmanta/agent/agent.py @@ -1176,6 +1176,7 @@ async def get_code( resource_install_specs.append(resource_install_spec) # Update the ``_previously_loaded`` cache to indicate that the given resource type's ResourceInstallSpec # was constructed successfully at the specified version. + # TODO: this cache is a slight memory leak self._previously_loaded[(resource_type, version)] = resource_install_spec else: invalid_resource_types.add(resource_type) diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 519dc8f67a..bb3b6341c1 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -635,23 +635,6 @@ def get_agent_client(self, tid: uuid.UUID, endpoint: str, live_agent_only: bool else: return None - # TODO: is this method still used? Same for the methods it calls - async def are_agents_active(self, tid: uuid.UUID, endpoints: list[str]) -> bool: - """ - Return true iff all the given agents are in the up or the paused state. - """ - return all(active for (_, active) in await self.get_agent_active_status(tid, endpoints)) - - async def get_agent_active_status(self, tid: uuid.UUID, endpoints: list[str]) -> list[tuple[str, bool]]: - """ - Return a list of tuples where the first element of the tuple contains the name of an endpoint - and the second a boolean indicating where there is an active (up or paused) agent for that endpoint. - """ - # TODO: this only checks that agent has a session, not that it is enabled (`on_reconnect` has finished) - all_sids_for_env = [sid for (sid, session) in self.sessions.items() if session.tid == tid] - all_active_endpoints_for_env = {ep for sid in all_sids_for_env for ep in self.endpoints_for_sid[sid]} - return [(ep, ep in all_active_endpoints_for_env) for ep in endpoints] - async def expire_sessions_for_agents(self, env_id: uuid.UUID, endpoints: Set[str]) -> None: """ Expire all sessions for any of the requested agent endpoints. From 243cb3914a08dc9b851fe17c22c22c27b3bab640 Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Fri, 10 May 2024 10:20:54 +0200 Subject: [PATCH 10/21] todos --- src/inmanta/server/agentmanager.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index bb3b6341c1..9957251463 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -264,7 +264,6 @@ async def _pause_agent( live_session = self.tid_endpoint_to_session.get(key) if live_session: # The agent has an active agent instance that has to be paused - # TODO: race condition? Agent.pause executes in transaction, new _seen_session could re-add it? del self.tid_endpoint_to_session[key] await live_session.get_client().set_state(agent_name, enabled=False) endpoints_with_new_primary.append((agent_name, None)) @@ -1069,6 +1068,8 @@ async def _ensure_agents( this list and in the agent map to be active before returning. :param restart: Restart all agents even if the list of agents is up to date. :param connection: The database connection to use. Must not be in a transaction context. + + :return: True iff a new agent process was started. """ if self._stopping: raise ShutdownInProgress() @@ -1094,11 +1095,12 @@ async def _ensure_agents( if env.halted: return False + start_new_process: bool if env.id not in self._agent_procs or self._agent_procs[env.id].returncode is not None: # Start new process if none is currently running for this environment. # Otherwise trust that it tracks any changes to the agent map. LOGGER.info("%s matches agents managed by server, ensuring it is started.", agents) - self._agent_procs[env.id] = await self.__do_start_agent(env, connection=connection) + start_new_process = True elif restart: LOGGER.info( "%s matches agents managed by server, forcing restart: stopping process with PID %s.", @@ -1106,19 +1108,19 @@ async def _ensure_agents( self._agent_procs[env.id], ) await self._stop_autostarted_agents(env, connection=connection) + start_new_process = True + else: + start_new_process = False + + if start_new_process: self._agent_procs[env.id] = await self.__do_start_agent(env, connection=connection) # Wait for all agents to start try: await self._wait_for_agents(env, agents, connection=connection) except asyncio.TimeoutError: - # TODO: better log message? Detailed one is already raised in wait method - # Depends on return value semantics - LOGGER.warning( - "Not all agent instances started successfully", - ) - # TODO: return values -> what does the bool mean? - return False + LOGGER.warning("Not all agent instances started successfully") + return start_new_process async def __do_start_agent( self, env: data.Environment, *, connection: Optional[asyncpg.connection.Connection] = None From cc001e8ab09eb7109a7a08f31373a2875bdedced Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Fri, 10 May 2024 10:48:52 +0200 Subject: [PATCH 11/21] todo --- src/inmanta/agent/agent.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/inmanta/agent/agent.py b/src/inmanta/agent/agent.py index 18f7c098de..3f7ed7a6d7 100644 --- a/src/inmanta/agent/agent.py +++ b/src/inmanta/agent/agent.py @@ -1039,7 +1039,9 @@ async def update_agent_map(self, agent_map: dict[str, str]) -> None: await self._update_agent_map(agent_map) async def _update_agent_map(self, agent_map: dict[str, str]) -> None: - # TODO: call validate method? i.e. inmanta.data.convert_agent_map + if "internal" not in agent_map: + raise ValueError("The internal agent must be present in the agent map") + async with self._instances_lock: self.agent_map = agent_map # Add missing agents From 25c6b7434a3a9e4f72a632c26458ff92fee7ca7a Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Fri, 10 May 2024 11:21:25 +0200 Subject: [PATCH 12/21] mypy --- src/inmanta/server/agentmanager.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index ba7ac1d9f5..4a1ed64b9e 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -640,7 +640,7 @@ async def expire_sessions_for_agents(self, env_id: uuid.UUID, endpoints: Set[str Expire all sessions for any of the requested agent endpoints. """ async with self.session_lock: - sessions_to_expire: Iterator[UUID] = ( + sessions_to_expire: Iterator[protocol.Session] = ( session for session in self.sessions.values() if endpoints & session.endpoint_names and session.tid == env_id ) await asyncio.gather(*(s.expire_and_abort(timeout=0) for s in sessions_to_expire)) @@ -1083,8 +1083,8 @@ async def _ensure_agents( Mapping[str, str], await env.get(data.AUTOSTART_AGENT_MAP, connection=connection) ) # we know the type of this map - agents: Set[str] = set(agents) & agent_map.keys() - if len(agents) == 0: + autostart_agents: Set[str] = set(agents) & agent_map.keys() + if len(autostart_agents) == 0: return False async with self.agent_lock: @@ -1100,12 +1100,12 @@ async def _ensure_agents( if env.id not in self._agent_procs or self._agent_procs[env.id].returncode is not None: # Start new process if none is currently running for this environment. # Otherwise trust that it tracks any changes to the agent map. - LOGGER.info("%s matches agents managed by server, ensuring it is started.", agents) + LOGGER.info("%s matches agents managed by server, ensuring it is started.", autostart_agents) start_new_process = True elif restart: LOGGER.info( "%s matches agents managed by server, forcing restart: stopping process with PID %s.", - agents, + autostart_agents, self._agent_procs[env.id], ) await self._stop_autostarted_agents(env, connection=connection) @@ -1118,7 +1118,7 @@ async def _ensure_agents( # Wait for all agents to start try: - await self._wait_for_agents(env, agents, connection=connection) + await self._wait_for_agents(env, autostart_agents, connection=connection) except asyncio.TimeoutError: LOGGER.warning("Not all agent instances started successfully") return start_new_process From da614fd4cc1819959abca81c586c232cbc6d8b98 Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Fri, 10 May 2024 15:57:11 +0200 Subject: [PATCH 13/21] tests --- tests/agent_server/test_server_agent.py | 47 +++++++++++++ tests/test_agent_manager.py | 93 +++++++++++++++++++++++++ 2 files changed, 140 insertions(+) diff --git a/tests/agent_server/test_server_agent.py b/tests/agent_server/test_server_agent.py index 2dbad268e9..edf00f530d 100644 --- a/tests/agent_server/test_server_agent.py +++ b/tests/agent_server/test_server_agent.py @@ -22,6 +22,7 @@ import os import time import uuid +from collections.abc import Mapping from functools import partial from itertools import groupby from logging import DEBUG @@ -1509,6 +1510,52 @@ async def assert_session_state(expected_agent_states: dict[str, AgentStatus], ex assert len(new_agent_processes) == 0, new_agent_processes +@pytest.mark.parametrize("autostarted", (True, False)) +async def test_autostart_mapping_overrides_config(server, client, environment, async_finalizer, caplog, autostarted: bool): + """ + Verify that the use_autostart_agent_map setting takes precedence over agents configured in the config file. + When the option is set the server's agent map should be the authority for which agents to manage. + """ + # configure agent as an autostarted agent or not + agent_config.use_autostart_agent_map.set(str(autostarted).lower()) + # also configure the agent with an explicit agent config and agent map, which should be ignored + configured_agent: str = "configured_agent" + agent_config.agent_names.set(configured_agent) + + env_uuid = uuid.UUID(environment) + agent_manager = server.get_slice(SLICE_AGENT_MANAGER) + + # configure server's autostarted agent map + autostarted_agent: str = "autostarted_agent" + result = await client.set_setting( + env_uuid, data.AUTOSTART_AGENT_MAP, {"internal": "localhost", autostarted_agent: "localhost"}, + ) + assert result.code == 200 + + # Start agent + a = agent.Agent(environment=env_uuid, code_loader=False) + await a.start() + async_finalizer(a.stop) + + # Wait until agents are up + await retry_limited(lambda: len(agent_manager.tid_endpoint_to_session) == (2 if autostarted else 1), 2) + + endpoint_sessions: Mapping[str, UUID] = { + key[1]: session.id + for key, session in agent_manager.tid_endpoint_to_session.items() + } + assert endpoint_sessions == ( + { + "internal": a.sessionid, + autostarted_agent: a.sessionid, + } + if autostarted + else { + configured_agent: a.sessionid, + } + ) + + async def test_autostart_mapping_update_uri(server, client, environment, async_finalizer, caplog): caplog.set_level(logging.INFO) agent_config.use_autostart_agent_map.set("true") diff --git a/tests/test_agent_manager.py b/tests/test_agent_manager.py index f1a4281c14..1b26442da9 100644 --- a/tests/test_agent_manager.py +++ b/tests/test_agent_manager.py @@ -1253,6 +1253,99 @@ async def test_dont_start_paused_agent(server, client, environment, caplog) -> N assert "took too long to start" not in caplog.text +async def test_wait_for_agent_map_update(server, client, environment) -> None: + """ + Verify that when _ensure_agents is called with an agent that is still starting, we wait for it rather than to kill the + current process and to start a new one. + """ + env_id: UUID = UUID(environment) + agent1: str = "agent1" + agent2: str = "agent2" + agent_manager = server.get_slice(SLICE_AGENT_MANAGER) + autostarted_agent_manager = server.get_slice(SLICE_AUTOSTARTED_AGENT_MANAGER) + + # Register agents in model + env = await data.Environment.get_by_id(env_id) + assert env is not None + await agent_manager.ensure_agent_registered(env=env, nodename=agent1) + await agent_manager.ensure_agent_registered(env=env, nodename=agent2) + + # Add agent1 to AUTOSTART_AGENT_MAP + result = await client.set_setting(tid=environment, id=data.AUTOSTART_AGENT_MAP, value={"internal": "", agent1: ""}) + assert result.code == 200, result.result + + env: Optional[data.Environment] + + # Start agent1 + assert (env_id, agent1) not in agent_manager.tid_endpoint_to_session + env = await data.Environment.get_by_id(env_id) + assert env is not None + started: bool = await autostarted_agent_manager._ensure_agents(env=env, agents=[agent1]) + assert started + assert (env_id, agent1) in agent_manager.tid_endpoint_to_session + assert len(autostarted_agent_manager._agent_procs) == 1 + + # Add agent2 to AUTOSTART_AGENT_MAP + result = await client.set_setting( + tid=environment, + id=data.AUTOSTART_AGENT_MAP, + value={"internal": "", agent1: "", agent2: ""}, + ) + assert result.code == 200, result.result + + # Call _ensure_agents with agent2 very shortly after adding it to the agent map (before the new instance has connected) + env = await data.Environment.get_by_id(env_id) + assert env is not None + started = await autostarted_agent_manager._ensure_agents(env=env, agents=[agent1, agent2]) + assert (env_id, agent2) in agent_manager.tid_endpoint_to_session + # Verify that we did not start a new process + assert not started + + +async def test_expire_sessions_on_restart(server, client, environment) -> None: + """ + Verify that when _ensure_agents is called for an explicit restart, we properly expire the old session instead of letting + it time out. + """ + env_id: UUID = UUID(environment) + agent_name: str = "agent1" + agent_manager = server.get_slice(SLICE_AGENT_MANAGER) + autostarted_agent_manager = server.get_slice(SLICE_AUTOSTARTED_AGENT_MANAGER) + + # Register agents in model + env = await data.Environment.get_by_id(env_id) + assert env is not None + await agent_manager.ensure_agent_registered(env=env, nodename=agent_name) + + # Add agent1 to AUTOSTART_AGENT_MAP + result = await client.set_setting(tid=environment, id=data.AUTOSTART_AGENT_MAP, value={"internal": "", agent_name: ""}) + assert result.code == 200, result.result + + env: Optional[data.Environment] + + # Start agent1 + assert (env_id, agent_name) not in agent_manager.tid_endpoint_to_session + env = await data.Environment.get_by_id(env_id) + assert env is not None + started: bool = await autostarted_agent_manager._ensure_agents(env=env, agents=[agent_name]) + assert started + current_session: Optional[protocol.Session] = agent_manager.tid_endpoint_to_session.get((env_id, agent_name), None) + assert current_session is not None + current_process: object = autostarted_agent_manager._agent_procs.get(env_id, None) + assert current_process is not None + + # restart agent1 + started = await autostarted_agent_manager._ensure_agents(env=env, agents=[agent_name], restart=True) + assert started + new_session: Optional[protocol.Session] = agent_manager.tid_endpoint_to_session.get((env_id, agent_name), None) + assert new_session is not None + new_process: object = autostarted_agent_manager._agent_procs.get(env_id, None) + assert new_process is not None + # assertions: should have started a new process, and expired the old session, then wait until the new one becomes active + assert new_process is not current_process + assert current_session.id != new_session.id + + async def test_auto_started_agent_log_in_debug_mode(server, environment): """ Test the logging of an autostarted agent From 822808346f878e7aeb33ac71385d5f85d0746d1b Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Fri, 10 May 2024 16:06:41 +0200 Subject: [PATCH 14/21] change entry --- changelogs/unreleased/ensure-agents-improvements.yml | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 changelogs/unreleased/ensure-agents-improvements.yml diff --git a/changelogs/unreleased/ensure-agents-improvements.yml b/changelogs/unreleased/ensure-agents-improvements.yml new file mode 100644 index 0000000000..a4d1d363cb --- /dev/null +++ b/changelogs/unreleased/ensure-agents-improvements.yml @@ -0,0 +1,9 @@ +description: "Made various improvements to the AutostartedAgent._ensure_agents method" +sections: + bugfix: "Fixed a race condition where autostarted agents might become unresponsive for 30s when restarted" +change-type: patch +destination-branches: + - master + - iso7 + - iso6 + From ffa50f02d7cc299d26fd3912609eee8cac02d45e Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Fri, 10 May 2024 16:07:39 +0200 Subject: [PATCH 15/21] issuenr --- changelogs/unreleased/ensure-agents-improvements.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/changelogs/unreleased/ensure-agents-improvements.yml b/changelogs/unreleased/ensure-agents-improvements.yml index a4d1d363cb..968afff3dc 100644 --- a/changelogs/unreleased/ensure-agents-improvements.yml +++ b/changelogs/unreleased/ensure-agents-improvements.yml @@ -1,6 +1,7 @@ description: "Made various improvements to the AutostartedAgent._ensure_agents method" sections: bugfix: "Fixed a race condition where autostarted agents might become unresponsive for 30s when restarted" +issue-nr: 7612 change-type: patch destination-branches: - master From 2ec766fdc7ba5d0f2b55a35c51bb2a195fd58e77 Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Fri, 10 May 2024 16:40:45 +0200 Subject: [PATCH 16/21] pep8 --- tests/agent_server/test_server_agent.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/agent_server/test_server_agent.py b/tests/agent_server/test_server_agent.py index edf00f530d..27ccce8ceb 100644 --- a/tests/agent_server/test_server_agent.py +++ b/tests/agent_server/test_server_agent.py @@ -1528,7 +1528,9 @@ async def test_autostart_mapping_overrides_config(server, client, environment, a # configure server's autostarted agent map autostarted_agent: str = "autostarted_agent" result = await client.set_setting( - env_uuid, data.AUTOSTART_AGENT_MAP, {"internal": "localhost", autostarted_agent: "localhost"}, + env_uuid, + data.AUTOSTART_AGENT_MAP, + {"internal": "localhost", autostarted_agent: "localhost"}, ) assert result.code == 200 @@ -1541,8 +1543,7 @@ async def test_autostart_mapping_overrides_config(server, client, environment, a await retry_limited(lambda: len(agent_manager.tid_endpoint_to_session) == (2 if autostarted else 1), 2) endpoint_sessions: Mapping[str, UUID] = { - key[1]: session.id - for key, session in agent_manager.tid_endpoint_to_session.items() + key[1]: session.id for key, session in agent_manager.tid_endpoint_to_session.items() } assert endpoint_sessions == ( { From ba6a018fd6fdf5d4fda7a94119f81215c10bd80e Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Mon, 13 May 2024 12:00:54 +0200 Subject: [PATCH 17/21] Update src/inmanta/server/agentmanager.py Co-authored-by: arnaudsjs <2684622+arnaudsjs@users.noreply.github.com> --- src/inmanta/server/agentmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 4a1ed64b9e..1d9429edb8 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -1100,7 +1100,7 @@ async def _ensure_agents( if env.id not in self._agent_procs or self._agent_procs[env.id].returncode is not None: # Start new process if none is currently running for this environment. # Otherwise trust that it tracks any changes to the agent map. - LOGGER.info("%s matches agents managed by server, ensuring it is started.", autostart_agents) + LOGGER.info("%s matches agents managed by server, ensuring they are started.", autostart_agents) start_new_process = True elif restart: LOGGER.info( From 1f09bd9c37a89a9b6d2bb9f9cbf39d8f11f71fcf Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Mon, 13 May 2024 12:01:18 +0200 Subject: [PATCH 18/21] Update src/inmanta/server/agentmanager.py Co-authored-by: arnaudsjs <2684622+arnaudsjs@users.noreply.github.com> --- src/inmanta/server/agentmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 1d9429edb8..631ef4a877 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -1326,7 +1326,7 @@ async def _wait_for_agents( for agent_name in expected_agents_in_up_state if ( (session := self._agent_manager.tid_endpoint_to_session.get((env.id, agent_name), None)) is not None - # make sure to check for expiry because sessions are unregistered from the agent manager asyncronously + # make sure to check for expiry because sessions are unregistered from the agent manager asynchronously and not session.expired ) } From d29acf5180a6aaee5621d929d3b45317fe9c5fe8 Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Mon, 13 May 2024 12:01:32 +0200 Subject: [PATCH 19/21] Update tests/agent_server/test_server_agent.py Co-authored-by: arnaudsjs <2684622+arnaudsjs@users.noreply.github.com> --- tests/agent_server/test_server_agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/agent_server/test_server_agent.py b/tests/agent_server/test_server_agent.py index 27ccce8ceb..f795f4c848 100644 --- a/tests/agent_server/test_server_agent.py +++ b/tests/agent_server/test_server_agent.py @@ -1540,7 +1540,7 @@ async def test_autostart_mapping_overrides_config(server, client, environment, a async_finalizer(a.stop) # Wait until agents are up - await retry_limited(lambda: len(agent_manager.tid_endpoint_to_session) == (2 if autostarted else 1), 2) + await retry_limited(lambda: len(agent_manager.tid_endpoint_to_session) == (2 if autostarted else 1), timeout=2) endpoint_sessions: Mapping[str, UUID] = { key[1]: session.id for key, session in agent_manager.tid_endpoint_to_session.items() From 765bdef028167661fc7b9489175f3746057742e8 Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Mon, 13 May 2024 14:38:20 +0200 Subject: [PATCH 20/21] comment --- src/inmanta/server/agentmanager.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 631ef4a877..9fda158121 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -1076,6 +1076,12 @@ async def _ensure_agents( raise ShutdownInProgress() if connection is not None and connection.is_in_transaction(): + # Should not be called in a transaction context because it has (immediate) side effects outside of the database + # that are tied to the database state. Several inconsistency issues could occur if this runs in a transaction + # context: + # - side effects based on oncommitted reads (may even need to be rolled back) + # - race condition with similar side effect flows due to stale reads (e.g. other flow pauses agent and kills + # process, this one brings it back because it reads the agent as unpaused) raise Exception("_ensure_agents should not be called in a transaction context") async with data.Agent.get_connection(connection) as connection: From 6b3ae249d1d52f379f6a307d4b735c683c7a6e8d Mon Sep 17 00:00:00 2001 From: Sander Van Balen Date: Mon, 13 May 2024 15:19:23 +0200 Subject: [PATCH 21/21] review comments --- src/inmanta/agent/agent.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/inmanta/agent/agent.py b/src/inmanta/agent/agent.py index cf0b883c62..fd3610fb8b 100644 --- a/src/inmanta/agent/agent.py +++ b/src/inmanta/agent/agent.py @@ -1072,7 +1072,11 @@ async def update_agent_map(self, agent_map: dict[str, str]) -> None: async def _update_agent_map(self, agent_map: dict[str, str]) -> None: if "internal" not in agent_map: - raise ValueError("The internal agent must be present in the agent map") + LOGGER.warning( + "Agent received an update_agent_map() trigger without internal agent in the agent_map %s", + agent_map, + ) + agent_map = {"internal": "local:", **agent_map} async with self._instances_lock: self.agent_map = agent_map