Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance investigation #7278

Closed
wants to merge 17 commits into from
82 changes: 61 additions & 21 deletions src/inmanta/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4621,6 +4621,36 @@ def convert_or_ignore(rvid):
)
return out

@classmethod
async def set_deployed_multi(
cls,
environment: uuid.UUID,
resource_ids: list[m.ResourceIdStr],
version: int,
connection: Optional[asyncpg.connection.Connection] = None,
) -> list[m.ResourceIdStr]:
query = "UPDATE resource SET status='deployed' WHERE environment=$1 AND model=$2 AND resource_id =ANY($3) "
async with cls.get_connection(connection) as connection:
await connection.execute(query, environment, version, resource_ids)

@classmethod
async def get_resource_ids_with_status(
cls,
environment: uuid.UUID,
resource_version_ids: list[m.ResourceIdStr],
version: int,
statuses: list[const.ResourceState],
lock: Optional[RowLockMode] = None,
connection: Optional[asyncpg.connection.Connection] = None,
) -> list[m.ResourceIdStr]:
query = "SELECT resource_id as resource_id FROM resource WHERE environment=$1 AND model=$2 AND status = ANY($3) and resource_id =ANY($4) "
if lock:
query += lock.value
async with cls.get_connection(connection) as connection:
return [
r["resource_id"] for r in await connection.fetch(query, environment, version, statuses, resource_version_ids)
]

@classmethod
async def get_undeployable(cls, environment: uuid.UUID, version: int) -> list["Resource"]:
"""
Expand Down Expand Up @@ -4796,27 +4826,38 @@ async def get_resources_for_version_raw_with_persistent_state(
version: int,
projection: Optional[list[str]],
projection_presistent: Optional[list[str]],
project_attributes: Optional[list[str]] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this so we don't have to pull in all attributes for the resource when not needed, this can be a LOT of data

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
project_attributes: Optional[list[str]] = None,
project_attributes: Optional[Sequence[LiteralString]] = None,

See typing.LiteralString docs. I think we should use this ideally because we bake it into the query itself, which is not safe with any user input. Ideally we'd do the same for the other lists but I'll leave that decision up to you.

Copy link
Contributor Author

@wouterdb wouterdb Mar 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mypy doesn't support it python/mypy#12554

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, shame

*,
connection: Optional[Connection] = None,
) -> list[dict[str, object]]:
"""This method performs none of the mangling required to produce valid resources!"""
"""This method performs none of the mangling required to produce valid resources!

project_attributes performs a projection on the json attributes of the resources table
sanderr marked this conversation as resolved.
Show resolved Hide resolved
"""

def collect_projection(projection: Optional[list[str]], prefix: str) -> str:
if not projection:
return f"{prefix}.*"
else:
return ",".join(f"{prefix}.{field}" for field in projection)

if project_attributes:
json_projection = "," + ",".join(f"r.attributes->'{v}' as {v}" for v in project_attributes)
else:
json_projection = ""

query = f"""
SELECT {collect_projection(projection, 'r')}, {collect_projection(projection_presistent, 'ps')}
SELECT {collect_projection(projection, 'r')}, {collect_projection(projection_presistent, 'ps')} {json_projection}
FROM {cls.table_name()} r JOIN resource_persistent_state ps ON r.resource_id = ps.resource_id
WHERE r.environment=$1 AND ps.environment = $1 and r.model = $2;"""

resource_records = await cls._fetch_query(query, environment, version, connection=connection)
resources = [dict(record) for record in resource_records]
for res in resources:
if "attributes" in res:
res["attributes"] = json.loads(res["attributes"])
if project_attributes:
for k in project_attributes:
if res[k]:
res[k] = json.loads(res[k])
return resources

@classmethod
Expand Down Expand Up @@ -5403,6 +5444,7 @@ async def get_list(
no_obj: Optional[bool] = None,
lock: Optional[RowLockMode] = None,
connection: Optional[asyncpg.connection.Connection] = None,
no_status: bool = False, # don't load the status field
**query: object,
) -> list["ConfigurationModel"]:
# sanitize and validate order parameters
Expand Down Expand Up @@ -5449,11 +5491,18 @@ async def get_list(
for record in query_result:
record = dict(record)
if no_obj:
record["status"] = await cls._get_status_field(record["environment"], record["status"])
if no_status:
record["status"] = {}
else:
record["status"] = await cls._get_status_field(record["environment"], record["status"])
result.append(record)
else:
done = record.pop("done")
status = await cls._get_status_field(record["environment"], record.pop("status"))
if no_status:
status = {}
record.pop("status")
else:
status = await cls._get_status_field(record["environment"], record.pop("status"))
obj = cls(from_postgres=True, **record)
obj._done = done
obj._status = status
Expand Down Expand Up @@ -5706,7 +5755,6 @@ async def get_increment(
projection_a_resource = [
"resource_id",
"attribute_hash",
"attributes",
"status",
]
projection_a_state = [
Expand All @@ -5715,11 +5763,12 @@ async def get_increment(
"last_deployed_attribute_hash",
"last_non_deploying_status",
]
projection_a_attributes = ["requires", "send_event"]
projection = ["resource_id", "status", "attribute_hash"]

# get resources for agent
resources = await Resource.get_resources_for_version_raw_with_persistent_state(
environment, version, projection_a_resource, projection_a_state, connection=connection
environment, version, projection_a_resource, projection_a_state, projection_a_attributes, connection=connection
)

# to increment
Expand All @@ -5740,20 +5789,11 @@ async def get_increment(
continue
# Now outstanding events
last_success = resource["last_success"] or DATETIME_MIN_UTC
attributes = resource["attributes"]
assert isinstance(attributes, dict) # mypy
for req in attributes["requires"]:
for req in resource["requires"]:
req_res = id_to_resource[req]
assert req_res is not None # todo
req_res_attributes = req_res["attributes"]
assert isinstance(req_res_attributes, dict) # mypy
last_produced_events = req_res["last_produced_events"]
if (
last_produced_events is not None
and last_produced_events > last_success
and "send_event" in req_res_attributes
and req_res_attributes["send_event"]
):
if last_produced_events is not None and last_produced_events > last_success and req_res["send_event"]:
in_increment = True
break

Expand Down Expand Up @@ -5839,9 +5879,9 @@ async def get_increment(

# build lookup tables
for res in resources:
for req in res["attributes"]["requires"]:
for req in res["requires"]:
original_provides[req].append(res["resource_id"])
if "send_event" in res["attributes"] and res["attributes"]["send_event"]:
if res["send_event"]:
send_events.append(res["resource_id"])

# recursively include stuff potentially receiving events from nodes in the increment
Expand Down
74 changes: 48 additions & 26 deletions src/inmanta/server/services/orchestrationservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
Contact: [email protected]
"""

import asyncio
import contextlib
import datetime
import logging
import uuid
Expand All @@ -28,6 +30,7 @@
import pydantic
from asyncpg import Connection

import inmanta.util
from inmanta import const, data
from inmanta.const import ResourceState
from inmanta.data import (
Expand Down Expand Up @@ -69,6 +72,8 @@
from inmanta.types import Apireturn, JsonType, PrimitiveTypes, ReturnTupple

LOGGER = logging.getLogger(__name__)
PLOGGER = logging.getLogger("performance")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: remove

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo alert



PERFORM_CLEANUP: bool = True
# Kill switch for cleanup, for use when working with historical data
Expand Down Expand Up @@ -411,7 +416,9 @@ async def _purge_versions(self) -> None:
# get available versions
n_versions = await env_item.get(AVAILABLE_VERSIONS_TO_KEEP, connection=connection)
assert isinstance(n_versions, int)
versions = await data.ConfigurationModel.get_list(environment=env_item.id, connection=connection)
versions = await data.ConfigurationModel.get_list(
environment=env_item.id, connection=connection, no_status=True
)
if len(versions) > n_versions:
LOGGER.info("Removing %s available versions from environment %s", len(versions) - n_versions, env_item.id)
version_dict = {x.version: x for x in versions}
Expand Down Expand Up @@ -652,7 +659,7 @@ async def _put_version(
pip_config: Optional[PipConfig] = None,
*,
connection: asyncpg.connection.Connection,
) -> None:
) -> abc.Sequence[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to make this a sequence? In return types I find that 90% of the time the concrete type is the appropriate pick.

"""
:param rid_to_resource: This parameter should contain all the resources when a full compile is done.
When a partial compile is done, it should contain all the resources that belong to the
Expand All @@ -666,6 +673,8 @@ async def _put_version(
sets that are removed by the partial compile. When no resource sets are removed by
a partial compile or when a full compile is done, this parameter can be set to None.

:return: all agents affected

Pre-conditions:
* The requires and provides relationships of the resources in rid_to_resource must be set correctly. For a
partial compile, this means it is assumed to be valid with respect to all absolute constraints that apply to
Expand Down Expand Up @@ -818,13 +827,14 @@ async def _put_version(
await ra.insert(connection=connection)

LOGGER.debug("Successfully stored version %d", version)
return list(all_agents)
sanderr marked this conversation as resolved.
Show resolved Hide resolved

async def _trigger_auto_deploy(
self,
env: data.Environment,
version: int,
*,
connection: Optional[Connection],
agents: Optional[abc.Sequence[str]] = None,
sanderr marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
"""
Triggers auto-deploy for stored resources. Must be called only after transaction that stores resources has been allowed
Expand All @@ -837,8 +847,8 @@ async def _trigger_auto_deploy(
push_on_auto_deploy = cast(bool, await env.get(data.PUSH_ON_AUTO_DEPLOY))
agent_trigger_method_on_autodeploy = cast(str, await env.get(data.AGENT_TRIGGER_METHOD_ON_AUTO_DEPLOY))
agent_trigger_method_on_autodeploy = const.AgentTriggerMethod[agent_trigger_method_on_autodeploy]
await self.release_version(
env, version, push_on_auto_deploy, agent_trigger_method_on_autodeploy, connection=connection
self.add_background_task(
self.release_version(env, version, push_on_auto_deploy, agent_trigger_method_on_autodeploy, agents=agents)
)

def _create_unknown_parameter_daos_from_api_unknowns(
Expand Down Expand Up @@ -916,13 +926,7 @@ async def put_version(
pip_config=pip_config,
connection=con,
)
try:
await self._trigger_auto_deploy(env, version, connection=con)
except Conflict as e:
# this should be an api warning, but this is not supported here
LOGGER.warning(
"Could not perform auto deploy on version %d in environment %s, because %s", version, env.id, e.log_message
)
sanderr marked this conversation as resolved.
Show resolved Hide resolved
await self._trigger_auto_deploy(env, version, connection=con)

return 200

Expand All @@ -946,6 +950,8 @@ async def put_partial(
"source": str
}
"""
PLOGGER.warning("STARTING PUT PARTIAL")
previous = asyncio.get_running_loop().time()
if resource_state is None:
resource_state = {}
if unknowns is None:
Expand Down Expand Up @@ -978,6 +984,9 @@ async def put_partial(
"Following resource sets are present in the removed resource sets and in the resources that are exported: "
f"{intersection}"
)
t_now = asyncio.get_running_loop().time()
PLOGGER.warning("INPUT VALIDATION: %s", t_now - previous)
previous = t_now

async with data.Resource.get_connection() as con:
async with con.transaction():
Expand Down Expand Up @@ -1020,6 +1029,10 @@ async def put_partial(
set_version=version,
)

t_now = asyncio.get_running_loop().time()
PLOGGER.warning("LOAD STAGE: %s", t_now - previous)
previous = t_now

updated_resource_sets: abc.Set[str] = {sr_name for sr_name in resource_sets.values() if sr_name is not None}
partial_update_merger = await PartialUpdateMerger.create(
env_id=env.id,
Expand All @@ -1033,14 +1046,17 @@ async def put_partial(

# add shared resources
merged_resources = partial_update_merger.merge_updated_and_shared_resources(list(rid_to_resource.values()))
t_now = asyncio.get_running_loop().time()
PLOGGER.warning("MERGE STAGE: %s", t_now - previous)
previous = t_now

await data.Code.copy_versions(env.id, base_version, version, connection=con)

merged_unknowns = await partial_update_merger.merge_unknowns(
unknowns_in_partial_compile=self._create_unknown_parameter_daos_from_api_unknowns(env.id, version, unknowns)
)

await self._put_version(
all_agents = await self._put_version(
env,
version,
merged_resources,
Expand All @@ -1052,16 +1068,15 @@ async def put_partial(
pip_config=pip_config,
connection=con,
)
t_now = asyncio.get_running_loop().time()
PLOGGER.warning("PUT STAGE: %s", t_now - previous)
previous = t_now

returnvalue: ReturnValue[int] = ReturnValue[int](200, response=version)
try:
await self._trigger_auto_deploy(env, version, connection=con)
except Conflict as e:
# It is unclear if this condition can ever happen
LOGGER.warning(
"Could not perform auto deploy on version %d in environment %s, because %s", version, env.id, e.log_message
)
returnvalue.add_warnings([f"Could not perform auto deploy: {e.log_message} {e.details}"])
await self._trigger_auto_deploy(env, version, agents=all_agents)

t_now = asyncio.get_running_loop().time()
PLOGGER.warning("AUTO DEPLOY STAGE: %s", t_now - previous)

return returnvalue

Expand All @@ -1074,9 +1089,14 @@ async def release_version(
agent_trigger_method: Optional[const.AgentTriggerMethod] = None,
*,
connection: Optional[asyncpg.connection.Connection] = None,
agents: Optional[abc.Sequence[str]] = None,
) -> ReturnTupple:
"""
:param agents: agents that have to be notified by the push
sanderr marked this conversation as resolved.
Show resolved Hide resolved
"""
async with data.ConfigurationModel.get_connection(connection) as connection:
async with connection.transaction():
version_run_ahead_lock = asyncio.Event()
async with connection.transaction(), inmanta.util.FinallySet(version_run_ahead_lock):
with ConnectionInTransaction(connection) as connection_holder:
# explicit lock to allow patching of increments for stale failures
# (locks out patching stage of deploy_done to avoid races)
Expand Down Expand Up @@ -1144,15 +1164,15 @@ async def release_version(
)

if latest_version:
increments: tuple[abc.Set[ResourceIdStr], abc.Set[ResourceIdStr]] = (
version, increment_ids, neg_increment, neg_increment_per_agent = (
await self.resource_service.get_increment(
env,
version_id,
connection=connection,
run_ahead_lock=version_run_ahead_lock,
)
)

increment_ids, neg_increment = increments
await self.resource_service.mark_deployed(
env, neg_increment, now, version_id, connection=connection_holder
)
Expand All @@ -1170,8 +1190,10 @@ async def release_version(
# We can't be in a transaction here, or the agent will not see the data that as committed
# This assert prevents anyone from wrapping this method in a transaction by accident
assert not connection.is_in_transaction()
# fetch all resource in this cm and create a list of distinct agents
agents = await data.ConfigurationModel.get_agents(env.id, version_id, connection=connection)

if agents is None:
# fetch all resource in this cm and create a list of distinct agents
agents = await data.ConfigurationModel.get_agents(env.id, version_id, connection=connection)
await self.autostarted_agent_manager._ensure_agents(env, agents, connection=connection)

for agent in agents:
Expand Down
Loading