Skip to content

Commit

Permalink
[core][fix] Persist parent update structure in an atomic way (#2196)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias authored Sep 17, 2024
1 parent 6d11a8a commit 21ddc09
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 111 deletions.
2 changes: 1 addition & 1 deletion fixcore/fixcore/db/db_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def get_graph_db(self, name: GraphName, no_check: bool = False) -> GraphDB:
else:
if not no_check and not self.database.has_graph(name):
raise NoSuchGraph(name)
graph_db = ArangoGraphDB(self.db, name, self.adjust_node, self.config.graph)
graph_db = ArangoGraphDB(self.db, name, self.adjust_node, self.config.graph, self.lock_db)
event_db = EventGraphDB(graph_db, self.event_sender)
self.graph_dbs[name] = event_db
return event_db
Expand Down
152 changes: 49 additions & 103 deletions fixcore/fixcore/db/graphdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from numbers import Number
from textwrap import dedent
from typing import (
DefaultDict,
Optional,
Callable,
AsyncGenerator,
Expand All @@ -18,7 +17,6 @@
Dict,
List,
Tuple,
TypeVar,
cast,
AsyncIterator,
Literal,
Expand All @@ -39,7 +37,8 @@
from fixcore.db import arango_query, EstimatedSearchCost
from fixcore.db.arango_query import fulltext_delimiter
from fixcore.db.async_arangodb import AsyncArangoDB, AsyncArangoTransactionDB, AsyncArangoDBBase, AsyncCursorContext
from fixcore.db.model import GraphUpdate, QueryModel
from fixcore.db.lockdb import LockDB
from fixcore.db.model import GraphUpdate, QueryModel, GraphChange
from fixcore.db.usagedb import resource_usage_db
from fixcore.error import InvalidBatchUpdate, ConflictingChangeInProgress, NoSuchChangeError, OptimisticLockingFailed
from fixcore.ids import NodeId, GraphName
Expand Down Expand Up @@ -275,6 +274,7 @@ def __init__(
name: GraphName,
adjust_node: AdjustNode,
config: GraphConfig,
lock_db: LockDB,
) -> None:
super().__init__()
self._name = name
Expand All @@ -283,6 +283,7 @@ def __init__(
self.in_progress = f"{name}_in_progress"
self.node_history = f"{name}_node_history"
self.usage_db = resource_usage_db(db, f"{name}_usage")
self.lock_db = lock_db
self.db = db
self.config = config

Expand All @@ -309,8 +310,8 @@ async def create_node(self, model: Model, node_id: NodeId, data: Json, under_nod
graph.add_node(node_id, data)
graph.add_edge(under_node_id, node_id, EdgeTypes.default)
access = GraphAccess(graph.graph, node_id, {under_node_id})
_, node_inserts, _, _ = self.prepare_nodes(access, [], model)
_, edge_inserts, _, _ = self.prepare_edges(access, [], EdgeTypes.default)
node_inserts = self.prepare_nodes(access, [], model).node_inserts
edge_inserts = self.prepare_edges(access, [], EdgeTypes.default).edge_inserts[EdgeTypes.default]
assert len(node_inserts) == 1
assert len(edge_inserts) == 1
edge_collection = self.edge_collection(EdgeTypes.default)
Expand Down Expand Up @@ -974,7 +975,7 @@ async def move_temp_to_proper(self, change_id: str, temp_name: str, update_histo
+ edge_updates
+ edge_deletes
+ usage_updates
+ [f'remove {{_key: "{change_key}"}} in {self.in_progress}'],
+ [f'remove {{_key: "{change_key}"}} in {self.in_progress} OPTIONS {{ ignoreErrors: true }}'],
)
)
cmd = f'function () {{\nvar db=require("@arangodb").db;\n{updates}\n}}'
Expand Down Expand Up @@ -1034,15 +1035,9 @@ def adjust_node(
# adjuster has the option to manipulate the resulting json
return self.node_adjuster.adjust(json)

def prepare_nodes(
self, access: GraphAccess, node_cursor: Iterable[Json], model: Model
) -> Tuple[GraphUpdate, List[Json], List[Json], List[Json]]:
def prepare_nodes(self, access: GraphAccess, node_cursor: Iterable[Json], model: Model) -> GraphChange:
log.info(f"Prepare nodes for subgraph {access.root()}")
info = GraphUpdate()
resource_inserts: List[Json] = []
resource_updates: List[Json] = []
resource_deletes: List[Json] = []

change = GraphChange()
optional_properties = [*Section.all_ordered, "refs", "kinds", "flat", "hash", "hist_hash"]

def insert_node(node: Json) -> None:
Expand All @@ -1052,17 +1047,15 @@ def insert_node(node: Json) -> None:
value = node.get(prop, None)
if value:
js_doc[prop] = value
resource_inserts.append(js_doc)
info.nodes_created += 1
change.node_inserts.append(js_doc)

def update_or_delete_node(node: Json) -> None:
key = node["_key"]
hash_string = node["hash"]
elem = access.node(key)
if elem is None:
# node is in db, but not in the graph any longer: delete node
resource_deletes.append({"_key": key, "deleted": access.at_json, "history": True})
info.nodes_deleted += 1
change.node_deletes.append({"_key": key, "deleted": access.at_json, "history": True})
elif elem["hash"] != hash_string:
# node is in db and in the graph, content is different
adjusted: Json = self.adjust_node(model, elem, node["created"], access.at_json)
Expand All @@ -1072,15 +1065,14 @@ def update_or_delete_node(node: Json) -> None:
value = adjusted.get(prop, None)
if value:
js[prop] = value
resource_updates.append(js)
info.nodes_updated += 1
change.node_updates.append(js)

for doc in node_cursor:
update_or_delete_node(doc)

for not_visited in access.not_visited_nodes():
insert_node(not_visited)
return info, resource_inserts, resource_updates, resource_deletes
return change

def _edge_to_json(
self, from_node: str, to_node: str, data: Optional[Json], refs: Optional[Dict[str, str]] = None, **kwargs: Any
Expand All @@ -1096,14 +1088,9 @@ def _edge_to_json(
js["_to"] = f"{self.vertex_name}/{to_node}"
return js

def prepare_edges(
self, access: GraphAccess, edge_cursor: Iterable[Json], edge_type: EdgeType
) -> Tuple[GraphUpdate, List[Json], List[Json], List[Json]]:
def prepare_edges(self, access: GraphAccess, edge_cursor: Iterable[Json], edge_type: EdgeType) -> GraphChange:
log.info(f"Prepare edges of type {edge_type} for subgraph {access.root()}")
info = GraphUpdate()
edge_inserts: List[Json] = []
edge_updates: List[Json] = []
edge_deletes: List[Json] = []
change = GraphChange()

def edge_json(from_node: str, to_node: str, edge_data: Optional[Json]) -> Json:
# Take the refs with the lower number of entries (or none):
Expand All @@ -1117,29 +1104,26 @@ def edge_json(from_node: str, to_node: str, edge_data: Optional[Json]) -> Json:
return self._edge_to_json(from_node, to_node, edge_data, refs)

def insert_edge(from_node: str, to_node: str, edge_data: Optional[Json]) -> None:
edge_inserts.append(edge_json(from_node, to_node, edge_data))
info.edges_created += 1
change.edge_inserts[edge_type].append(edge_json(from_node, to_node, edge_data))

def update_edge(edge: Json) -> None:
from_node = edge["_from"].split("/")[1] # vertex/id
to_node = edge["_to"].split("/")[1] # vertex/id
has_edge, edge_data = access.has_edge(from_node, to_node, edge_type)
edge_hash = edge_data.get("hash") if edge_data else None
if not has_edge:
edge_deletes.append(edge)
info.edges_deleted += 1
change.edge_deletes[edge_type].append(edge)
elif edge_hash != edge.get("hash"):
js = edge_json(from_node, to_node, edge_data)
edge_updates.append(js)
info.edges_updated += 1
change.edge_updates[edge_type].append(js)

for doc in edge_cursor:
update_edge(doc)

for edge_from, edge_to, data in access.not_visited_edges(edge_type):
insert_edge(edge_from, edge_to, data)

return info, edge_inserts, edge_updates, edge_deletes
return change

async def merge_graph(
self,
Expand All @@ -1154,105 +1138,67 @@ async def merge_graph(

async def prepare_graph(
sub: GraphAccess, node_query: Tuple[str, Json], edge_query: Callable[[EdgeType], Tuple[str, Json]]
) -> Tuple[
GraphUpdate,
List[Json], # node insert
List[Json], # node update
List[Json], # node delete
Dict[EdgeType, List[Json]], # edge insert
Dict[EdgeType, List[Json]], # edge update
Dict[EdgeType, List[Json]], # edge delete
]:
graph_info = GraphUpdate()
) -> GraphChange:
graph_change = GraphChange()
# check all nodes for this subgraph
query, bind = node_query
log.debug(f"Query for nodes: {sub.root()}")
with await self.db.aql(query, bind_vars=bind, batch_size=50000) as node_cursor:
node_info, ni, nu, nd = self.prepare_nodes(sub, node_cursor, model)
graph_info += node_info
graph_change += self.prepare_nodes(sub, node_cursor, model)

# check all edges in all relevant edge-collections
edge_inserts: DefaultDict[EdgeType, List[Json]] = defaultdict(list)
edge_updates: DefaultDict[EdgeType, List[Json]] = defaultdict(list)
edge_deletes: DefaultDict[EdgeType, List[Json]] = defaultdict(list)
for edge_type in EdgeTypes.all:
query, bind = edge_query(edge_type)
log.debug(f"Query for edges of type {edge_type}: {sub.root()}")
with await self.db.aql(query, bind_vars=bind, batch_size=50000) as ec:
edge_info, gei, geu, ged = self.prepare_edges(sub, ec, edge_type)
graph_info += edge_info
edge_inserts[edge_type] = gei
edge_updates[edge_type] = geu
edge_deletes[edge_type] = ged
return graph_info, ni, nu, nd, edge_inserts, edge_updates, edge_deletes
graph_change += self.prepare_edges(sub, ec, edge_type)
return graph_change

roots, parent, graphs = GraphAccess.merge_graphs(graph_to_merge)
log.info(f"merge_graph {len(roots)} merge nodes found. change_id={change_id}, is_batch={is_batch}.")

def merge_edges(merge_node: str, merge_node_kind: str, edge_type: EdgeType) -> Tuple[str, Json]:
return self.query_update_edges(edge_type, merge_node_kind), {"update_id": merge_node}

K = TypeVar("K") # noqa: N806
V = TypeVar("V") # noqa: N806

def combine_dict(left: Dict[K, List[V]], right: Dict[K, List[V]]) -> Dict[K, List[V]]:
result = dict(left)
for key, right_values in right.items():
left_values = left.get(key)
result[key] = left_values + right_values if left_values else right_values
return result
def parent_edges(edge_type: EdgeType) -> Tuple[str, Json]:
edge_ids = [self.db_edge_key(f, t) for f, t, k in parent.g.edges(keys=True) if k.edge_type == edge_type]
return self.edges_by_ids_and_until_replace_node(edge_type, preserve_parent_structure, parent, edge_ids)

# this will throw an exception, in case of a conflicting update (--> outside try block)
log.debug("Mark all parent nodes for this update to avoid conflicting changes")
await self.mark_update(roots, list(parent.nodes), change_id, is_batch)
try:

def parent_edges(edge_type: EdgeType) -> Tuple[str, Json]:
edge_ids = [self.db_edge_key(f, t) for f, t, k in parent.g.edges(keys=True) if k.edge_type == edge_type]
return self.edges_by_ids_and_until_replace_node(edge_type, preserve_parent_structure, parent, edge_ids)

# store parent nodes and edges with a mutex to avoid conflicts
parents_nodes = self.nodes_by_ids_and_until_replace_node(preserve_parent_structure, parent)
info, nis, nus, nds, eis, eus, eds = await prepare_graph(parent, parents_nodes, parent_edges)
async with self.lock_db.lock("merge_graph_parents"):
parent_change_id = change_id + "_parent"
parent_change = await prepare_graph(parent, parents_nodes, parent_edges)
if parent_change.change_count(): # only persist in case of changes
await self._persist_update(parent_change_id, False, parent_change, update_history)

change = GraphChange()
for num, (root, graph) in enumerate(graphs):
root_kind = GraphResolver.resolved_kind(graph_to_merge.nodes[root])
if root_kind:
# noinspection PyTypeChecker
log.info(f"Update subgraph: root={root} ({root_kind}, {num+1} of {len(roots)})")
node_query = self.query_update_nodes(root_kind), {"update_id": root}
edge_query = partial(merge_edges, root, root_kind)

i, ni, nu, nd, ei, eu, ed = await prepare_graph(graph, node_query, edge_query)
info += i
nis += ni
nus += nu
nds += nd
eis = combine_dict(eis, ei)
eus = combine_dict(eus, eu)
eds = combine_dict(eds, ed)
change += await prepare_graph(graph, node_query, edge_query)
else:
# Already checked in GraphAccess - only here as safeguard.
# Already checked in GraphAccess - only here as a safeguard.
raise AttributeError(f"Kind of update root {root} is not a pre-resolved and can not be used!")

log.debug(f"Update prepared: {info}. Going to persist the changes.")
graph_update = parent_change.to_update() + change.to_update()
log.debug(f"Update prepared: {graph_update}. Going to persist the changes.")
await self._refresh_marked_update(change_id)
await self._persist_update(change_id, is_batch, nis, nus, nds, eis, eus, eds, update_history)
return roots, info
await self._persist_update(change_id, is_batch, change, update_history)
return roots, graph_update
except Exception as ex:
await self.delete_marked_update(change_id)
raise ex

async def _persist_update(
self,
change_id: str,
is_batch: bool,
resource_inserts: List[Json],
resource_updates: List[Json],
resource_deletes: List[Json],
edge_inserts: Dict[EdgeType, List[Json]],
edge_updates: Dict[EdgeType, List[Json]],
edge_deletes: Dict[EdgeType, List[Json]],
update_history: bool,
) -> None:
async def _persist_update(self, change_id: str, is_batch: bool, change: GraphChange, update_history: bool) -> None:
async def execute_many_async(
async_fn: Callable[[str, List[Json]], Any], name: str, array: List[Json], **kwargs: Any
) -> None:
Expand All @@ -1275,20 +1221,20 @@ async def trafo_many(

async def store_to_tmp_collection(temp: StandardCollection) -> None:
tmp = temp.name
ri = trafo_many(self.db.insert_many, tmp, resource_inserts, {"action": "node_created"})
ru = trafo_many(self.db.insert_many, tmp, resource_updates, {"action": "node_updated"})
rd = trafo_many(self.db.insert_many, tmp, resource_deletes, {"action": "node_deleted"})
ri = trafo_many(self.db.insert_many, tmp, change.node_inserts, {"action": "node_created"})
ru = trafo_many(self.db.insert_many, tmp, change.node_updates, {"action": "node_updated"})
rd = trafo_many(self.db.insert_many, tmp, change.node_deletes, {"action": "node_deleted"})
edge_i = [
trafo_many(self.db.insert_many, tmp, inserts, {"action": "edge_insert", "edge_type": tpe})
for tpe, inserts in edge_inserts.items()
for tpe, inserts in change.edge_inserts.items()
]
edge_u = [
trafo_many(self.db.insert_many, tmp, updates, {"action": "edge_update", "edge_type": tpe})
for tpe, updates in edge_updates.items()
for tpe, updates in change.edge_updates.items()
]
edge_d = [
trafo_many(self.db.insert_many, tmp, deletes, {"action": "edge_delete", "edge_type": tpe})
for tpe, deletes in edge_deletes.items()
for tpe, deletes in change.edge_deletes.items()
]
await asyncio.gather(*([ri, ru, rd] + edge_i + edge_u + edge_d))

Expand Down Expand Up @@ -1562,7 +1508,7 @@ async def copy_graph(self, to_graph: GraphName, to_snapshot: bool = False) -> Gr
if await self.db.has_graph(to_graph):
raise ValueError(f"Graph {to_graph} already exists")

new_graph_db = ArangoGraphDB(db=self.db, name=to_graph, adjust_node=self.node_adjuster, config=self.config)
new_graph_db = ArangoGraphDB(self.db, to_graph, self.node_adjuster, self.config, self.lock_db)

# collection creation can't be a part of a transaction so we do that first
# we simply reuse the existing create_update_schema method but do not insert any genesis data
Expand Down
Loading

0 comments on commit 21ddc09

Please sign in to comment.