Skip to content

Commit

Permalink
Fix the replacement of the organization roots (#1901)
Browse files Browse the repository at this point in the history
* Fix the replacement of the organization roots

* more tests

* untangle concerns

---------

Co-authored-by: Matthias Veit <[email protected]>
  • Loading branch information
meln1k and aquamatthias authored Feb 6, 2024
1 parent 04ba9e3 commit af74c1d
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 58 deletions.
102 changes: 61 additions & 41 deletions resotocore/resotocore/db/graphdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ async def merge_graph(
maybe_change_id: Optional[str] = None,
is_batch: bool = False,
update_history: bool = True,
preserve_parent_structure: bool = False,
) -> Tuple[List[str], GraphUpdate]:
pass

Expand Down Expand Up @@ -1028,6 +1029,7 @@ async def merge_graph(
maybe_change_id: Optional[str] = None,
is_batch: bool = False,
update_history: bool = True,
preserve_parent_structure: bool = False,
) -> Tuple[List[str], GraphUpdate]:
change_id = maybe_change_id if maybe_change_id else uuid_str()

Expand Down Expand Up @@ -1077,18 +1079,12 @@ def combine_dict(left: Dict[K, List[V]], right: Dict[K, List[V]]) -> Dict[K, Lis
log.debug("Mark all parent nodes for this update to avoid conflicting changes")
await self.mark_update(roots, list(parent.nodes), change_id, is_batch)
try:
cloud_ids = [node[1].get("id") for node in parent.nodes(data=True) if "cloud" in node[1].get("kinds", [])]
cloud_ids = [cid for cid in cloud_ids if cid]
cloud_id = cloud_ids[0] if cloud_ids else None

def parent_edges(edge_type: EdgeType) -> Tuple[str, Json]:
edge_ids = [self.db_edge_key(f, t) for f, t, et in parent.g.edges(data="edge_type") if et == edge_type]
return self.edges_by_ids_and_until_replace_node(edge_type), {"ids": edge_ids, "node_id": cloud_id}
return self.edges_by_ids_and_until_replace_node(edge_type, preserve_parent_structure, parent, edge_ids)

parents_nodes = self.nodes_by_ids_and_until_replace_node(), {
"ids": list(parent.g.nodes),
"node_id": cloud_id,
}
parents_nodes = self.nodes_by_ids_and_until_replace_node(preserve_parent_structure, parent)
info, nis, nus, nds, eis, eds = await prepare_graph(parent, parents_nodes, parent_edges)
for num, (root, graph) in enumerate(graphs):
root_kind = GraphResolver.resolved_kind(graph_to_merge.nodes[root])
Expand Down Expand Up @@ -1532,48 +1528,69 @@ def update_resolved(
"""
)

def nodes_by_ids_and_until_replace_node(self) -> str:
def nodes_by_ids_and_until_replace_node(
self, preserve_parent_structure: bool, access: GraphAccess
) -> Tuple[str, Json]:
query_update_nodes_by_ids = (
f"FOR a IN `{self.vertex_name}` "
"FILTER a._key IN @ids RETURN {_key: a._key, hash:a.hash, created:a.created}"
)
filter_section = " AND ".join(f"'{kind}' not in node.kinds" for kind in GraphResolver.resolved_ancestors)
nodes_until_replace_node = f"""
FOR node IN 0..100 OUTBOUND DOCUMENT('{self.vertex_name}', @node_id) `{self.edge_collection(EdgeTypes.default)}`
PRUNE node.metadata["replace"] == true
OPTIONS {{ bfs: true, uniqueVertices: 'global' }}
FILTER {filter_section}
RETURN {{_key: node._key, hash: node.hash, created: node.created}}
"""
bind_vars: Json = {"ids": list(access.g.nodes)}
if preserve_parent_structure:
cloud_id = access.cloud_node_id()
assert cloud_id is not None, "When parent structure should be preserved, a cloud node is required!"
bind_vars["node_id"] = cloud_id
filter_section = " AND ".join(f"'{kind}' not in node.kinds" for kind in GraphResolver.resolved_ancestors)
nodes_until_replace_node = f"""
FOR cloud_node in `{self.vertex_name}` FILTER cloud_node._key == @node_id
FOR node IN 0..100 OUTBOUND cloud_node
`{self.edge_collection(EdgeTypes.default)}`
PRUNE node.metadata["replace"] == true
OPTIONS {{ bfs: true, uniqueVertices: 'global' }}
FILTER {filter_section}
RETURN {{_key: node._key, hash: node.hash, created: node.created}}
"""

return f"""
LET nodes_by_ids = ({query_update_nodes_by_ids})
LET nodes_until_replace = ({nodes_until_replace_node})
LET all_of_them = UNION_DISTINCT(nodes_by_ids, nodes_until_replace)
FOR n IN all_of_them RETURN n
"""
statement = f"""
LET nodes_by_ids = ({query_update_nodes_by_ids})
LET nodes_until_replace = ({nodes_until_replace_node})
LET all_of_them = UNION_DISTINCT(nodes_by_ids, nodes_until_replace)
FOR n IN all_of_them RETURN n
"""
return statement, bind_vars
else:
return query_update_nodes_by_ids, bind_vars

def edges_by_ids_and_until_replace_node(self, edge_type: EdgeType) -> str:
def edges_by_ids_and_until_replace_node(
self, edge_type: EdgeType, preserve_parent_structure: bool, access: GraphAccess, edge_ids: List[str]
) -> Tuple[str, Json]:
collection = self.edge_collection(edge_type)
bind_vars: Json = {"ids": edge_ids}
query_update_edges_by_ids = (
f"FOR a IN `{collection}` FILTER a._key in @ids RETURN {{_key: a._key, _from: a._from, _to: a._to}}"
)

filter_section = " AND ".join(f"'{kind}' not in node.kinds" for kind in GraphResolver.resolved_ancestors)
edges_until_replace_node = f"""
FOR node, edge IN 0..100 OUTBOUND DOCUMENT('{self.vertex_name}', @node_id) `{self.edge_collection(edge_type)}`
PRUNE node.metadata["replace"] == true
OPTIONS {{ bfs: true, uniqueVertices: 'global' }}
FILTER {filter_section}
RETURN {{_key: edge._key, _from: edge._from, _to: edge._to}}
"""

return f"""
LET edges_by_ids = ({query_update_edges_by_ids})
LET edges_until_replace = ({edges_until_replace_node})
LET all_of_them = UNION_DISTINCT(edges_by_ids, edges_until_replace)
FOR e IN all_of_them RETURN e
"""
if preserve_parent_structure:
cloud_id = access.cloud_node_id()
assert cloud_id is not None, "When parent structure should be preserved, a cloud node is required!"
bind_vars["node_id"] = cloud_id
filter_section = " AND ".join(f"'{kind}' not in node.kinds" for kind in GraphResolver.resolved_ancestors)
edges_until_replace_node = f"""
FOR cloud_node in `{self.vertex_name}` FILTER cloud_node._key == @node_id
FOR node, edge IN 0..100 OUTBOUND cloud_node `{self.edge_collection(edge_type)}`
PRUNE node.metadata["replace"] == true
OPTIONS {{ bfs: true, uniqueVertices: 'global' }}
FILTER {filter_section}
RETURN {{_key: edge._key, _from: edge._from, _to: edge._to}}
"""
statement = f"""
LET edges_by_ids = ({query_update_edges_by_ids})
LET edges_until_replace = ({edges_until_replace_node})
LET all_of_them = UNION_DISTINCT(edges_by_ids, edges_until_replace)
FOR e IN all_of_them RETURN e
"""
return statement, bind_vars
else:
return query_update_edges_by_ids, bind_vars


class EventGraphDB(GraphDB):
Expand Down Expand Up @@ -1655,8 +1672,11 @@ async def merge_graph(
maybe_change_id: Optional[str] = None,
is_batch: bool = False,
update_history: bool = True,
preserve_parent_structure: bool = False,
) -> Tuple[List[str], GraphUpdate]:
roots, info = await self.real.merge_graph(graph_to_merge, model, maybe_change_id, is_batch, update_history)
roots, info = await self.real.merge_graph(
graph_to_merge, model, maybe_change_id, is_batch, update_history, preserve_parent_structure
)
root_counter: Dict[str, int] = {}
for root in roots:
root_node = graph_to_merge.nodes[root]
Expand Down
8 changes: 7 additions & 1 deletion resotocore/resotocore/model/db_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,13 @@ async def merge_graph(self, db: DbAccess) -> GraphUpdate: # type: ignore
graphdb = db.get_graph_db(nxt.graph)
outer_edge_db = db.deferred_outer_edge_db
await graphdb.insert_usage_data(builder.usage)
_, result = await graphdb.merge_graph(builder.graph, model, nxt.change_id, nxt.is_batch)
_, result = await graphdb.merge_graph(
builder.graph,
model,
nxt.change_id,
is_batch=nxt.is_batch,
preserve_parent_structure=builder.organizational_root is not None,
)
# sizes of model entries have been adjusted during the merge. Update the model in the db.
await model_handler.update_model(graphdb.name, list(model.kinds.values()), False)
if nxt.task_id and builder.deferred_edges:
Expand Down
24 changes: 18 additions & 6 deletions resotocore/resotocore/model/graph_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,20 +154,24 @@ def __init__(self, model: Model, change_id: str):
self.usage: List[UsageDatapoint] = []
self.at = int(utc().timestamp())
self.change_id = change_id
self.organizational_root: Optional[NodeId] = None

def add_from_json(self, js: Json) -> None:
if "id" in js and Section.reported in js:
usage_json = js.get(Section.usage, {})
if len(usage_json) == 0:
usage_json = None
self.add_node(
node = self.add_node(
node_id=js["id"],
reported=js[Section.reported],
desired=js.get(Section.desired, None),
metadata=js.get(Section.metadata, None),
search=js.get("search", None),
replace=js.get("replace", False) is True,
)
if "organizational_root" in node["kinds_set"]:
assert self.organizational_root is None, "There can be only one organizational root!"
self.organizational_root = node["id"]
if usage_json:
usage = UsageDatapoint(
id=js["id"],
Expand Down Expand Up @@ -236,7 +240,7 @@ def add_node(
metadata: Optional[Json] = None,
search: Optional[str] = None,
replace: bool = False,
) -> None:
) -> Dict[str, Any]:
self.nodes += 1
# validate kind of this reported json
coerced = self.model.check_valid(reported)
Expand All @@ -250,8 +254,7 @@ def add_node(
sha = GraphBuilder.content_hash(reported, desired, metadata)
# flat all properties into a single string for search
flat = search if isinstance(search, str) else (GraphBuilder.flatten(reported, kind))
self.graph.add_node(
node_id,
node = dict(
id=node_id,
reported=reported,
desired=desired,
Expand All @@ -262,8 +265,10 @@ def add_node(
kinds_set=kind.kind_hierarchy(),
flat=flat,
)
self.graph.add_node(node_id, **node)
# update property sizes
self.__update_property_size(kind, reported)
return node

def add_edge(self, from_node: str, to_node: str, edge_type: EdgeType) -> None:
self.edges += 1
Expand Down Expand Up @@ -377,6 +382,13 @@ def node(self, node_id: NodeId) -> Optional[Json]:
else:
return None

def cloud_node_id(self) -> Optional[NodeId]:
cloud_ids = [
data.get("id") for nid, data in self.nodes(data=True) if "cloud" in data.get("kinds", []) and data.get("id")
]
assert len(cloud_ids) <= 1, f"More than one cloud node found: {cloud_ids}"
return cloud_ids[0] if cloud_ids else None

def has_edge(self, from_id: object, to_id: object, edge_type: EdgeType) -> bool:
key = self.edge_key(from_id, to_id, edge_type)
result: bool = self.g.has_edge(from_id, to_id, key)
Expand Down Expand Up @@ -563,10 +575,10 @@ def collect_until_replace_node(
data = graph.nodes[source]
replace = (data.get("metadata", {}) or {}).get("replace", False)
if replace:
return {source: data}, set([source])
return {source: data}, {source}

replace_nodes: Dict[NodeId, Json] = {}
replace_nodes_predecessors: Set[NodeId] = set([source])
replace_nodes_predecessors: Set[NodeId] = {source}

for child in graph.successors(source):
rn, pred = collect_until_replace_node(graph, child, seen)
Expand Down
60 changes: 50 additions & 10 deletions resotocore/tests/resotocore/db/graphdb_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,27 @@ def add_node(uid: str, kind: str, node: Optional[Json] = None, replace: bool = F


# something similar to the AWS organizational root scheme
def create_graph_org_root_like(bla_text: str, width: int = 10, org_root_id: str = "org_root") -> MultiDiGraph:
def create_graph_org_root_like(
bla_text: str, width: int = 10, org_root_id: Optional[str] = "org_root", account_id: str = "aws_account"
) -> MultiDiGraph:
graph = MultiDiGraph()

def add_edge(from_node: str, to_node: str, edge_type: EdgeType = EdgeTypes.default) -> None:
key = GraphAccess.edge_key(from_node, to_node, edge_type)
graph.add_edge(from_node, to_node, key, edge_type=edge_type)

def add_node(uid: str, kind: str, node: Optional[Json] = None, replace: bool = False) -> None:
def add_node(
uid: str, kind: str, node: Optional[Json] = None, replace: bool = False, org_root: bool = False
) -> None:
reported = {**(node if node else to_json(Foo(uid))), "kind": kind}
kinds_set = {kind}
if org_root:
kinds_set.add("organizational_root")
graph.add_node(
uid,
id=uid,
kinds=[kind],
kinds_set=kinds_set,
reported=reported,
desired={"node_id": uid},
metadata={"node_id": uid, "replace": replace},
Expand All @@ -140,18 +148,19 @@ def add_node(uid: str, kind: str, node: Optional[Json] = None, replace: bool = F
# root -> collector -> sub_root -> **rest
add_node("root", "graph_root")
add_node("aws", "cloud")
add_node("aws_account", "account", replace=True)
add_node(account_id, "account", replace=True)

add_edge("root", "aws")
add_edge("aws", "aws_account")
add_edge("aws", account_id)

add_node(org_root_id, "foo")
add_edge("aws", org_root_id)
if org_root_id:
add_node(org_root_id, "foo", org_root=True)
add_edge("aws", org_root_id)

for o in range(0, width):
oid = str(o)
add_node(oid, "foo")
add_edge("aws_account", oid)
add_edge(account_id, oid)
for i in range(0, width):
iid = f"{o}_{i}"
add_node(iid, "bla", node=to_json(Bla(iid, name=bla_text)))
Expand Down Expand Up @@ -304,20 +313,51 @@ def create(txt: str, width: int = 10, org_root_id: str = "org_root") -> MultiDiG

p = ["aws_account"]
# empty database: all nodes and all edges have to be inserted, the root node is updated and the link to root added
assert await graph_db.merge_graph(create("yes or no"), foo_model, maybe_change_id="foo") == (
assert await graph_db.merge_graph(create("yes or no"), foo_model, preserve_parent_structure=True) == (
p,
GraphUpdate(113, 1, 0, 213, 0, 0),
)

# exactly the same graph is updated: no changes
assert await graph_db.merge_graph(create("yes or no"), foo_model) == (p, GraphUpdate(0, 0, 0, 0, 0, 0))
assert await graph_db.merge_graph(create("yes or no"), foo_model, preserve_parent_structure=True) == (
p,
GraphUpdate(0, 0, 0, 0, 0, 0),
)
# root_branch_id is changed: old node should be deleted and new one inserted
assert await graph_db.merge_graph(create("yes or no", org_root_id="new_org_root"), foo_model) == (
assert await graph_db.merge_graph(
create("yes or no", org_root_id="new_org_root"), foo_model, preserve_parent_structure=True
) == (
p,
GraphUpdate(1, 0, 1, 1, 0, 1),
)


@mark.asyncio
async def test_keep_org_root_when_merging_graph(graph_db: ArangoGraphDB, foo_model: Model) -> None:
await graph_db.wipe()

def create(
txt: str, width: int = 10, org_root_id: Optional[str] = "org_root", account_id: str = "aws_account"
) -> MultiDiGraph:
return create_graph_org_root_like(txt, width=width, org_root_id=org_root_id, account_id=account_id)

p = ["aws_account"]
# empty database: all nodes and all edges have to be inserted, the root node is updated and the link to root added
assert await graph_db.merge_graph(create("yes or no"), foo_model, maybe_change_id="foo") == (
p,
GraphUpdate(113, 1, 0, 213, 0, 0),
)

# exactly the same graph is updated: no changes
assert await graph_db.merge_graph(create("yes or no"), foo_model) == (p, GraphUpdate(0, 0, 0, 0, 0, 0))

# adding another account without org_root does not delete the old root:
_, update = await graph_db.merge_graph(create("yes or no", org_root_id=None, account_id="aws_account_2"), foo_model)
assert GraphUpdate(111, 1, 0, 211, 0, 0) == update

assert await graph_db.by_id(NodeId("org_root")) is not None


@mark.asyncio
async def test_merge_multi_graph(graph_db: ArangoGraphDB, foo_model: Model) -> None:
await graph_db.wipe()
Expand Down

0 comments on commit af74c1d

Please sign in to comment.