Skip to content

Commit

Permalink
[resotocore][fix] Maintain ancestor refs in history and allow to dele…
Browse files Browse the repository at this point in the history
…te history (#1835)
  • Loading branch information
aquamatthias authored Nov 23, 2023
1 parent 1b07c50 commit b05420a
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 22 deletions.
57 changes: 37 additions & 20 deletions resotocore/resotocore/db/graphdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async def update_security_section(
pass

@abstractmethod
async def delete_node(self, node_id: NodeId, model: Model) -> None:
async def delete_node(self, node_id: NodeId, model: Model, keep_history: bool = False) -> None:
pass

@abstractmethod
Expand All @@ -133,6 +133,7 @@ async def merge_graph(
model: Model,
maybe_change_id: Optional[str] = None,
is_batch: bool = False,
update_history: bool = True,
) -> Tuple[List[str], GraphUpdate]:
pass

Expand All @@ -141,7 +142,7 @@ async def list_in_progress_updates(self) -> List[Json]:
pass

@abstractmethod
async def commit_batch_update(self, batch_id: str) -> None:
async def commit_batch_update(self, batch_id: str, update_history: bool = True) -> None:
pass

@abstractmethod
Expand Down Expand Up @@ -435,7 +436,7 @@ async def update_nodes_section_with(
for element in cursor:
yield trafo(element)

async def delete_node(self, node_id: NodeId, model: Model) -> None:
async def delete_node(self, node_id: NodeId, model: Model, keep_history: bool = False) -> None:
async def delete_children(element: Json) -> None:
with await self.db.aql(query=self.query_count_direct_children(), bind_vars={"rid": node_id}) as cursor:
count = cursor.next()
Expand All @@ -444,11 +445,20 @@ async def delete_children(element: Json) -> None:
# Note: this will only work for nodes that are resolved (cloud, account, region, zone...)
builder = GraphBuilder(model, node_id)
builder.add_node(node_id, reported=element[Section.reported], replace=True)
await self.merge_graph(builder.graph, model, node_id)
await self.merge_graph(builder.graph, model, node_id, update_history=keep_history)

if node := await self.get_node(model, node_id):
async def delete_history(element: Json) -> None:
# if this element is a resolved kind, we will delete all nodes from history with a reference to this kind
if (kd := GraphResolver.resolved_kind(element)) and (ref := GraphResolver.resolved_ancestors.get(kd)):
q = f"FOR doc IN `{self.node_history}` FILTER doc.{ref} == @node_id REMOVE doc IN `{self.node_history}`"
with await self.db.aql(query=q, bind_vars={"node_id": node_id}):
pass

if node := await self.by_id(node_id):
await delete_children(node)
await self.db.delete_vertex(self.name, {"_id": f'{self.vertex_name}/{node["id"]}'})
if not keep_history:
await delete_history(node)
await self.db.delete_vertex(self.name, {"_id": node["_id"]})

async def update_security_section(
self,
Expand Down Expand Up @@ -484,10 +494,13 @@ def update_security_section(

async def update_chunk(chunk: Dict[NodeId, Json]) -> None:
nonlocal nodes_vulnerable_new, nodes_vulnerable_updated
async with await self.search_list(QueryModel(Query.by(P.with_id(list(chunk.keys()))), model)) as ctx:
async with await self.search_list(
QueryModel(Query.by(P.with_id(list(chunk.keys()))), model), no_trafo=True
) as ctx:
nodes_to_insert = []
async for node in ctx:
node_id = NodeId(value_in_path_get(node, NodePath.node_id, ""))
node_id = NodeId(node.pop("_key", ""))
node["id"] = node_id # store the id in the id column (not _key)
existing: List[Json] = value_in_path_get(node, NodePath.security_issues, [])
security_section = chunk[node_id]
updated, severity, is_update = update_security_section(existing, security_section.get("issues", []))
Expand All @@ -508,6 +521,7 @@ async def update_chunk(chunk: Dict[NodeId, Json]) -> None:
elif is_update: # the content has changed
nodes_vulnerable_updated += 1
nodes_to_insert.append(dict(action="node_vulnerable", node_id=node_id, data=node))
node["before"] = existing
node["change"] = "node_vulnerable"
else: # no change
nodes_to_insert.append(dict(action="mark", node_id=node_id, run_id=report_run_id))
Expand Down Expand Up @@ -580,7 +594,7 @@ async def search_list(
q_string, bind = await self.to_query(query)
return await self.db.aql_cursor(
query=q_string,
trafo=self.document_to_instance_fn(query.model, query),
trafo=None if kwargs.get("no_trafo") else self.document_to_instance_fn(query.model, query),
count=with_count,
full_count=with_count,
bind_vars=bind,
Expand Down Expand Up @@ -755,7 +769,7 @@ async def get_tmp_collection(self, change_id: str, create: bool = True) -> Stand
else:
raise NoSuchChangeError(change_id)

async def move_temp_to_proper(self, change_id: str, temp_name: str) -> None:
async def move_temp_to_proper(self, change_id: str, temp_name: str, update_history: bool = True) -> None:
change_key = str(uuid.uuid5(uuid.NAMESPACE_DNS, change_id))
log.info(f"Move temp->proper data: change_id={change_id}, change_key={change_key}, temp_name={temp_name}")
edge_inserts = [
Expand All @@ -770,7 +784,7 @@ async def move_temp_to_proper(self, change_id: str, temp_name: str) -> None:
]
history_updates = [
f'for e in {temp_name} filter e.action=="node_created" insert MERGE({{id: e.data._key, change: e.action, changed_at: e.data.created}}, UNSET(e.data, "_key", "flat", "hash")) in {self.node_history}', # noqa: E501
f'for e in {temp_name} filter e.action=="node_updated" insert MERGE({{id: e.data._key, change: e.action, changed_at: e.data.updated}}, UNSET(e.data, "_key", "flat", "hash")) in {self.node_history}', # noqa: E501
f'for e in {temp_name} filter e.action=="node_updated" let node = Document(CONCAT("{self.vertex_name}/", e.data._key)) insert MERGE({{id: e.data._key, change: e.action, changed_at: e.data.updated, before: node.reported}}, UNSET(e.data, "_key", "flat", "hash")) in {self.node_history}', # noqa: E501
f'for e in {temp_name} filter e.action=="node_deleted" let node = Document(CONCAT("{self.vertex_name}/", e.data._key)) insert MERGE({{id: node._key, change: "node_deleted", deleted: e.data.deleted, changed_at: e.data.deleted}}, UNSET(node, "_key", "_id", "_rev", "flat", "hash")) in {self.node_history}', # noqa: E501
]
usage_updates = [
Expand All @@ -779,7 +793,7 @@ async def move_temp_to_proper(self, change_id: str, temp_name: str) -> None:
updates = ";\n".join(
map(
lambda aql: f"db._createStatement({{ query: `{aql}` }}).execute()",
(history_updates if self.config.keep_history else [])
(history_updates if self.config.keep_history and update_history else [])
+ [
f'for e in {temp_name} filter e.action=="node_created" insert e.data in {self.vertex_name}'
' OPTIONS{overwriteMode: "replace"}',
Expand Down Expand Up @@ -941,6 +955,7 @@ async def merge_graph(
model: Model,
maybe_change_id: Optional[str] = None,
is_batch: bool = False,
update_history: bool = True,
) -> Tuple[List[str], GraphUpdate]:
change_id = maybe_change_id if maybe_change_id else uuid_str()

Expand Down Expand Up @@ -1017,7 +1032,7 @@ def combine_dict(left: Dict[K, List[V]], right: Dict[K, List[V]]) -> Dict[K, Lis

log.debug(f"Update prepared: {info}. Going to persist the changes.")
await self.refresh_marked_update(change_id)
await self.persist_update(change_id, is_batch, info, nis, nus, nds, eis, eds)
await self.persist_update(change_id, is_batch, info, nis, nus, nds, eis, eds, update_history)
return roots, info
except Exception as ex:
await self.delete_marked_update(change_id)
Expand All @@ -1033,6 +1048,7 @@ async def persist_update(
resource_deletes: List[Json],
edge_inserts: Dict[EdgeType, List[Json]],
edge_deletes: Dict[EdgeType, List[Json]],
update_history: bool,
) -> None:
async def execute_many_async(
async_fn: Callable[[str, List[Json]], Any], name: str, array: List[Json], **kwargs: Any
Expand Down Expand Up @@ -1074,7 +1090,7 @@ async def update_via_temp_collection() -> None:
log.debug(f"Store change in temp collection {temp.name}")
try:
await store_to_tmp_collection(temp)
await self.move_temp_to_proper(change_id, temp.name)
await self.move_temp_to_proper(change_id, temp.name, update_history)
finally:
log.debug(f"Delete temp collection {temp.name}")
await self.db.delete_collection(temp.name)
Expand All @@ -1091,7 +1107,7 @@ async def update_batch() -> None:
await update_via_temp_collection()
log.debug("Persist update done.")

async def commit_batch_update(self, batch_id: str) -> None:
async def commit_batch_update(self, batch_id: str, update_history: bool = True) -> None:
temp_table = await self.get_tmp_collection(batch_id, False)
await self.move_temp_to_proper(batch_id, temp_table.name)
await self.db.delete_collection(temp_table.name)
Expand Down Expand Up @@ -1468,8 +1484,8 @@ async def update_node(
await self.event_sender.core_event(CoreEvent.NodeUpdated, {"graph": self.graph_name, "section": section})
return result

async def delete_node(self, node_id: NodeId, model: Model) -> None:
await self.real.delete_node(node_id, model)
async def delete_node(self, node_id: NodeId, model: Model, keep_history: bool = False) -> None:
await self.real.delete_node(node_id, model, keep_history)
await self.event_sender.core_event(CoreEvent.NodeDeleted, {"graph": self.graph_name})

def update_nodes(
Expand Down Expand Up @@ -1512,8 +1528,9 @@ async def merge_graph(
model: Model,
maybe_change_id: Optional[str] = None,
is_batch: bool = False,
update_history: bool = True,
) -> Tuple[List[str], GraphUpdate]:
roots, info = await self.real.merge_graph(graph_to_merge, model, maybe_change_id, is_batch)
roots, info = await self.real.merge_graph(graph_to_merge, model, maybe_change_id, is_batch, update_history)
root_counter: Dict[str, int] = {}
for root in roots:
root_node = graph_to_merge.nodes[root]
Expand Down Expand Up @@ -1552,9 +1569,9 @@ async def merge_graph(
async def list_in_progress_updates(self) -> List[Json]:
return await self.real.list_in_progress_updates()

async def commit_batch_update(self, batch_id: str) -> None:
async def commit_batch_update(self, batch_id: str, update_history: bool = True) -> None:
info = first(lambda x: x["id"] == batch_id, await self.real.list_in_progress_updates())
await self.real.commit_batch_update(batch_id)
await self.real.commit_batch_update(batch_id, update_history)
await self.event_sender.core_event(CoreEvent.BatchUpdateCommitted, {"graph": self.graph_name, "batch": info})

async def abort_update(self, batch_id: str) -> None:
Expand Down
6 changes: 4 additions & 2 deletions resotocore/resotocore/web/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,11 +939,12 @@ async def update_node(self, request: Request, deps: TenantDependencies) -> Strea
async def delete_node(self, request: Request, deps: TenantDependencies) -> StreamResponse:
graph_name = GraphName(request.match_info.get("graph_id", "resoto"))
node_id = NodeId(request.match_info.get("node_id", "some_existing"))
keep_history = request.query.get("keep_history", "false").lower() == "true"
if node_id == "root":
raise AttributeError("Root node can not be deleted!")
graph = deps.db_access.get_graph_db(graph_name)
model = await deps.model_handler.load_model(graph_name)
await graph.delete_node(node_id, model)
await graph.delete_node(node_id, model, keep_history)
return web.HTTPNoContent()

async def update_nodes(self, request: Request, deps: TenantDependencies) -> StreamResponse:
Expand Down Expand Up @@ -1016,7 +1017,8 @@ async def list_batches(self, request: Request, deps: TenantDependencies) -> Stre
async def commit_batch(self, request: Request, deps: TenantDependencies) -> StreamResponse:
graph_db = deps.db_access.get_graph_db(GraphName(request.match_info.get("graph_id", "resoto")))
batch_id = request.match_info.get("batch_id", "some_existing")
await graph_db.commit_batch_update(batch_id)
update_history = request.query.get("update_history", "true").lower() == "true"
await graph_db.commit_batch_update(batch_id, update_history)
return web.HTTPOk(body="Batch committed.")

async def abort_batch(self, request: Request, deps: TenantDependencies) -> StreamResponse:
Expand Down

0 comments on commit b05420a

Please sign in to comment.