Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance investigation #7278

Closed
wants to merge 17 commits into from
7 changes: 7 additions & 0 deletions changelogs/unreleased/7262-performance.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
description: Improve performance for very large models
wouterdb marked this conversation as resolved.
Show resolved Hide resolved
issue-nr: 7262
change-type: minor
destination-branches: [master, iso7]
sections:
minor-improvement: "{{description}}"

2 changes: 1 addition & 1 deletion src/inmanta/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
is_time,
)

agent_deploy_interval = Option(
agent_deploy_interval: Option[int | str] = Option(
"config",
"agent-deploy-interval",
0,
Expand Down
28 changes: 15 additions & 13 deletions src/inmanta/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,19 @@ def get(cls, section: Optional[str] = None, name: Optional[str] = None, default_

@classmethod
def get_for_option(cls, option: "Option[T]") -> T:
raw_value: Optional[str | T] = cls._get_value(option.section, option.name, option.get_default_value())
raw_value: str | T = cls._get_value(option.section, option.name, option.get_default_value())
return option.validate(raw_value)

@classmethod
def _get_value(cls, section: str, name: str, default_value: Optional[T] = None) -> Optional[str | T]:
def _get_value(cls, section: str, name: str, default_value: T) -> str | T:
cfg: ConfigParser = cls.get_instance()
val: Optional[str] = _get_from_env(section, name)
if val is not None:
LOGGER.debug(f"Setting {section}:{name} was set using an environment variable")
else:
val = cfg.get(section, name, fallback=default_value)

return val
return val
# Typing of this method in the sdk is not entirely accurate
# It just returns the fallback, whatever its type
return cfg.get(section, name, fallback=default_value)

@classmethod
def is_set(cls, section: str, name: str) -> bool:
Expand Down Expand Up @@ -205,12 +205,12 @@ def is_float(value: str) -> float:
return float(value)


def is_time(value: str) -> int:
def is_time(value: str | int) -> int:
"""Time, the number of seconds represented as an integer value"""
return int(value)


def is_time_or_cron(value: str) -> Union[int, str]:
def is_time_or_cron(value: str | int) -> Union[int, str]:
"""Time, the number of seconds represented as an integer value or a cron-like expression"""
try:
return is_time(value)
Expand All @@ -232,8 +232,10 @@ def is_bool(value: Union[bool, str]) -> bool:
return boolean_states[value.lower()]


def is_list(value: str) -> list[str]:
def is_list(value: str | list[str]) -> list[str]:
"""List of comma-separated values"""
if isinstance(value, list):
return value
return [] if value == "" else [x.strip() for x in value.split(",")]


Expand Down Expand Up @@ -304,9 +306,9 @@ def __init__(
self,
section: str,
name: str,
default: Union[T, None, Callable[[], T]],
default: Union[T, Callable[[], T]],
documentation: str,
validator: Callable[[Optional[str | T]], T] = is_str,
validator: Callable[[str | T], T] = is_str,
predecessor_option: Optional["Option"] = None,
) -> None:
self.section = section
Expand Down Expand Up @@ -342,10 +344,10 @@ def get_default_desc(self) -> str:
else:
return f"``{defa}``"

def validate(self, value: Optional[str | T]) -> T:
def validate(self, value: str | T) -> T:
return self.validator(value)

def get_default_value(self) -> Optional[T]:
def get_default_value(self) -> T:
defa = self.default
if callable(defa):
return defa()
Expand Down
90 changes: 67 additions & 23 deletions src/inmanta/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4621,6 +4621,40 @@ def convert_or_ignore(rvid):
)
return out

@classmethod
async def set_deployed_multi(
cls,
environment: uuid.UUID,
resource_ids: Sequence[m.ResourceIdStr],
version: int,
connection: Optional[asyncpg.connection.Connection] = None,
) -> None:
query = "UPDATE resource SET status='deployed' WHERE environment=$1 AND model=$2 AND resource_id =ANY($3) "
async with cls.get_connection(connection) as connection:
await connection.execute(query, environment, version, resource_ids)

@classmethod
async def get_resource_ids_with_status(
cls,
environment: uuid.UUID,
resource_version_ids: list[m.ResourceIdStr],
version: int,
statuses: Sequence[const.ResourceState],
lock: Optional[RowLockMode] = None,
connection: Optional[asyncpg.connection.Connection] = None,
) -> list[m.ResourceIdStr]:
query = (
"SELECT resource_id as resource_id FROM resource WHERE "
"environment=$1 AND model=$2 AND status = ANY($3) and resource_id =ANY($4) "
)
if lock:
query += lock.value
async with cls.get_connection(connection) as connection:
return [
m.ResourceIdStr(cast(str, r["resource_id"]))
sanderr marked this conversation as resolved.
Show resolved Hide resolved
for r in await connection.fetch(query, environment, version, statuses, resource_version_ids)
]

@classmethod
async def get_undeployable(cls, environment: uuid.UUID, version: int) -> list["Resource"]:
"""
Expand Down Expand Up @@ -4796,27 +4830,38 @@ async def get_resources_for_version_raw_with_persistent_state(
version: int,
projection: Optional[list[str]],
projection_presistent: Optional[list[str]],
project_attributes: Optional[list[str]] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this so we don't have to pull in all attributes for the resource when not needed, this can be a LOT of data

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
project_attributes: Optional[list[str]] = None,
project_attributes: Optional[Sequence[LiteralString]] = None,

See typing.LiteralString docs. I think we should use this ideally because we bake it into the query itself, which is not safe with any user input. Ideally we'd do the same for the other lists but I'll leave that decision up to you.

Copy link
Contributor Author

@wouterdb wouterdb Mar 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mypy doesn't support it python/mypy#12554

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, shame

*,
connection: Optional[Connection] = None,
) -> list[dict[str, object]]:
"""This method performs none of the mangling required to produce valid resources!"""
"""This method performs none of the mangling required to produce valid resources!

project_attributes performs a projection on the json attributes of the resources table
sanderr marked this conversation as resolved.
Show resolved Hide resolved
"""

def collect_projection(projection: Optional[list[str]], prefix: str) -> str:
if not projection:
return f"{prefix}.*"
else:
return ",".join(f"{prefix}.{field}" for field in projection)

if project_attributes:
json_projection = "," + ",".join(f"r.attributes->'{v}' as {v}" for v in project_attributes)
else:
json_projection = ""

query = f"""
SELECT {collect_projection(projection, 'r')}, {collect_projection(projection_presistent, 'ps')}
SELECT {collect_projection(projection, 'r')}, {collect_projection(projection_presistent, 'ps')} {json_projection}
FROM {cls.table_name()} r JOIN resource_persistent_state ps ON r.resource_id = ps.resource_id
WHERE r.environment=$1 AND ps.environment = $1 and r.model = $2;"""

resource_records = await cls._fetch_query(query, environment, version, connection=connection)
resources = [dict(record) for record in resource_records]
for res in resources:
if "attributes" in res:
res["attributes"] = json.loads(res["attributes"])
if project_attributes:
for k in project_attributes:
if res[k]:
res[k] = json.loads(res[k])
return resources

@classmethod
Expand Down Expand Up @@ -5403,6 +5448,7 @@ async def get_list(
no_obj: Optional[bool] = None,
lock: Optional[RowLockMode] = None,
connection: Optional[asyncpg.connection.Connection] = None,
no_status: bool = False, # don't load the status field
**query: object,
) -> list["ConfigurationModel"]:
# sanitize and validate order parameters
Expand Down Expand Up @@ -5446,14 +5492,21 @@ async def get_list(
{lock_statement}"""
query_result = await cls._fetch_query(query_string, *values, connection=connection)
result = []
for record in query_result:
record = dict(record)
for in_record in query_result:
record = dict(in_record)
if no_obj:
record["status"] = await cls._get_status_field(record["environment"], record["status"])
if no_status:
record["status"] = {}
else:
record["status"] = await cls._get_status_field(record["environment"], record["status"])
result.append(record)
else:
done = record.pop("done")
status = await cls._get_status_field(record["environment"], record.pop("status"))
if no_status:
status = {}
record.pop("status")
else:
status = await cls._get_status_field(record["environment"], record.pop("status"))
obj = cls(from_postgres=True, **record)
obj._done = done
obj._status = status
Expand Down Expand Up @@ -5706,7 +5759,6 @@ async def get_increment(
projection_a_resource = [
"resource_id",
"attribute_hash",
"attributes",
"status",
]
projection_a_state = [
Expand All @@ -5715,11 +5767,12 @@ async def get_increment(
"last_deployed_attribute_hash",
"last_non_deploying_status",
]
projection_a_attributes = ["requires", "send_event"]
projection = ["resource_id", "status", "attribute_hash"]

# get resources for agent
resources = await Resource.get_resources_for_version_raw_with_persistent_state(
environment, version, projection_a_resource, projection_a_state, connection=connection
environment, version, projection_a_resource, projection_a_state, projection_a_attributes, connection=connection
)

# to increment
Expand All @@ -5740,20 +5793,11 @@ async def get_increment(
continue
# Now outstanding events
last_success = resource["last_success"] or DATETIME_MIN_UTC
attributes = resource["attributes"]
assert isinstance(attributes, dict) # mypy
for req in attributes["requires"]:
for req in resource["requires"]:
req_res = id_to_resource[req]
assert req_res is not None # todo
req_res_attributes = req_res["attributes"]
assert isinstance(req_res_attributes, dict) # mypy
last_produced_events = req_res["last_produced_events"]
if (
last_produced_events is not None
and last_produced_events > last_success
and "send_event" in req_res_attributes
and req_res_attributes["send_event"]
):
if last_produced_events is not None and last_produced_events > last_success and req_res["send_event"]:
in_increment = True
break

Expand Down Expand Up @@ -5839,9 +5883,9 @@ async def get_increment(

# build lookup tables
for res in resources:
for req in res["attributes"]["requires"]:
for req in res["requires"]:
original_provides[req].append(res["resource_id"])
if "send_event" in res["attributes"] and res["attributes"]["send_event"]:
if res["send_event"]:
send_events.append(res["resource_id"])

# recursively include stuff potentially receiving events from nodes in the increment
Expand Down
2 changes: 1 addition & 1 deletion src/inmanta/server/agentmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ async def _terminate_agents(self) -> None:
async def _ensure_agents(
self,
env: data.Environment,
agents: list[str],
agents: Sequence[str],
restart: bool = False,
*,
connection: Optional[asyncpg.connection.Connection] = None,
Expand Down
10 changes: 5 additions & 5 deletions src/inmanta/server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,18 +244,18 @@ def validate_fact_renew(value: object) -> int:
"server", "purge-resource-action-logs-interval", 3600, "The number of seconds between resource-action log purging", is_time
)

server_resource_action_log_prefix = Option(
server_resource_action_log_prefix: Option[str] = Option(
"server",
"resource_action_log_prefix",
"resource-actions-",
"File prefix in log-dir, containing the resource-action logs. The after the prefix the environment uuid and .log is added",
is_str,
)

server_enabled_extensions = Option(
server_enabled_extensions: Option[list[str]] = Option(
"server",
"enabled_extensions",
"",
[],
sanderr marked this conversation as resolved.
Show resolved Hide resolved
"A list of extensions the server must load. Core is always loaded."
"If an extension listed in this list is not available, the server will refuse to start.",
is_list,
Expand All @@ -271,9 +271,9 @@ def validate_fact_renew(value: object) -> int:
)


def default_hangtime() -> str:
def default_hangtime() -> int:
""":inmanta.config:option:`server.agent-timeout` *3/4"""
return str(int(agent_timeout.get() * 3 / 4))
return int(agent_timeout.get() * 3 / 4)


agent_hangtime = Option(
Expand Down
Loading