Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][fix] Persist parent update structure in an atomic way #2196

Merged
merged 2 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fixcore/fixcore/db/db_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def get_graph_db(self, name: GraphName, no_check: bool = False) -> GraphDB:
else:
if not no_check and not self.database.has_graph(name):
raise NoSuchGraph(name)
graph_db = ArangoGraphDB(self.db, name, self.adjust_node, self.config.graph)
graph_db = ArangoGraphDB(self.db, name, self.adjust_node, self.config.graph, self.lock_db)
event_db = EventGraphDB(graph_db, self.event_sender)
self.graph_dbs[name] = event_db
return event_db
Expand Down
152 changes: 49 additions & 103 deletions fixcore/fixcore/db/graphdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from numbers import Number
from textwrap import dedent
from typing import (
DefaultDict,
Optional,
Callable,
AsyncGenerator,
Expand All @@ -18,7 +17,6 @@
Dict,
List,
Tuple,
TypeVar,
cast,
AsyncIterator,
Literal,
Expand All @@ -39,7 +37,8 @@
from fixcore.db import arango_query, EstimatedSearchCost
from fixcore.db.arango_query import fulltext_delimiter
from fixcore.db.async_arangodb import AsyncArangoDB, AsyncArangoTransactionDB, AsyncArangoDBBase, AsyncCursorContext
from fixcore.db.model import GraphUpdate, QueryModel
from fixcore.db.lockdb import LockDB
from fixcore.db.model import GraphUpdate, QueryModel, GraphChange
from fixcore.db.usagedb import resource_usage_db
from fixcore.error import InvalidBatchUpdate, ConflictingChangeInProgress, NoSuchChangeError, OptimisticLockingFailed
from fixcore.ids import NodeId, GraphName
Expand Down Expand Up @@ -275,6 +274,7 @@ def __init__(
name: GraphName,
adjust_node: AdjustNode,
config: GraphConfig,
lock_db: LockDB,
) -> None:
super().__init__()
self._name = name
Expand All @@ -283,6 +283,7 @@ def __init__(
self.in_progress = f"{name}_in_progress"
self.node_history = f"{name}_node_history"
self.usage_db = resource_usage_db(db, f"{name}_usage")
self.lock_db = lock_db
self.db = db
self.config = config

Expand All @@ -309,8 +310,8 @@ async def create_node(self, model: Model, node_id: NodeId, data: Json, under_nod
graph.add_node(node_id, data)
graph.add_edge(under_node_id, node_id, EdgeTypes.default)
access = GraphAccess(graph.graph, node_id, {under_node_id})
_, node_inserts, _, _ = self.prepare_nodes(access, [], model)
_, edge_inserts, _, _ = self.prepare_edges(access, [], EdgeTypes.default)
node_inserts = self.prepare_nodes(access, [], model).node_inserts
edge_inserts = self.prepare_edges(access, [], EdgeTypes.default).edge_inserts[EdgeTypes.default]
assert len(node_inserts) == 1
assert len(edge_inserts) == 1
edge_collection = self.edge_collection(EdgeTypes.default)
Expand Down Expand Up @@ -974,7 +975,7 @@ async def move_temp_to_proper(self, change_id: str, temp_name: str, update_histo
+ edge_updates
+ edge_deletes
+ usage_updates
+ [f'remove {{_key: "{change_key}"}} in {self.in_progress}'],
+ [f'remove {{_key: "{change_key}"}} in {self.in_progress} OPTIONS {{ ignoreErrors: true }}'],
)
)
cmd = f'function () {{\nvar db=require("@arangodb").db;\n{updates}\n}}'
Expand Down Expand Up @@ -1034,15 +1035,9 @@ def adjust_node(
# adjuster has the option to manipulate the resulting json
return self.node_adjuster.adjust(json)

def prepare_nodes(
self, access: GraphAccess, node_cursor: Iterable[Json], model: Model
) -> Tuple[GraphUpdate, List[Json], List[Json], List[Json]]:
def prepare_nodes(self, access: GraphAccess, node_cursor: Iterable[Json], model: Model) -> GraphChange:
log.info(f"Prepare nodes for subgraph {access.root()}")
info = GraphUpdate()
resource_inserts: List[Json] = []
resource_updates: List[Json] = []
resource_deletes: List[Json] = []

change = GraphChange()
optional_properties = [*Section.all_ordered, "refs", "kinds", "flat", "hash", "hist_hash"]

def insert_node(node: Json) -> None:
Expand All @@ -1052,17 +1047,15 @@ def insert_node(node: Json) -> None:
value = node.get(prop, None)
if value:
js_doc[prop] = value
resource_inserts.append(js_doc)
info.nodes_created += 1
change.node_inserts.append(js_doc)

def update_or_delete_node(node: Json) -> None:
key = node["_key"]
hash_string = node["hash"]
elem = access.node(key)
if elem is None:
# node is in db, but not in the graph any longer: delete node
resource_deletes.append({"_key": key, "deleted": access.at_json, "history": True})
info.nodes_deleted += 1
change.node_deletes.append({"_key": key, "deleted": access.at_json, "history": True})
elif elem["hash"] != hash_string:
# node is in db and in the graph, content is different
adjusted: Json = self.adjust_node(model, elem, node["created"], access.at_json)
Expand All @@ -1072,15 +1065,14 @@ def update_or_delete_node(node: Json) -> None:
value = adjusted.get(prop, None)
if value:
js[prop] = value
resource_updates.append(js)
info.nodes_updated += 1
change.node_updates.append(js)

for doc in node_cursor:
update_or_delete_node(doc)

for not_visited in access.not_visited_nodes():
insert_node(not_visited)
return info, resource_inserts, resource_updates, resource_deletes
return change

def _edge_to_json(
self, from_node: str, to_node: str, data: Optional[Json], refs: Optional[Dict[str, str]] = None, **kwargs: Any
Expand All @@ -1096,14 +1088,9 @@ def _edge_to_json(
js["_to"] = f"{self.vertex_name}/{to_node}"
return js

def prepare_edges(
self, access: GraphAccess, edge_cursor: Iterable[Json], edge_type: EdgeType
) -> Tuple[GraphUpdate, List[Json], List[Json], List[Json]]:
def prepare_edges(self, access: GraphAccess, edge_cursor: Iterable[Json], edge_type: EdgeType) -> GraphChange:
log.info(f"Prepare edges of type {edge_type} for subgraph {access.root()}")
info = GraphUpdate()
edge_inserts: List[Json] = []
edge_updates: List[Json] = []
edge_deletes: List[Json] = []
change = GraphChange()

def edge_json(from_node: str, to_node: str, edge_data: Optional[Json]) -> Json:
# Take the refs with the lower number of entries (or none):
Expand All @@ -1117,29 +1104,26 @@ def edge_json(from_node: str, to_node: str, edge_data: Optional[Json]) -> Json:
return self._edge_to_json(from_node, to_node, edge_data, refs)

def insert_edge(from_node: str, to_node: str, edge_data: Optional[Json]) -> None:
edge_inserts.append(edge_json(from_node, to_node, edge_data))
info.edges_created += 1
change.edge_inserts[edge_type].append(edge_json(from_node, to_node, edge_data))

def update_edge(edge: Json) -> None:
from_node = edge["_from"].split("/")[1] # vertex/id
to_node = edge["_to"].split("/")[1] # vertex/id
has_edge, edge_data = access.has_edge(from_node, to_node, edge_type)
edge_hash = edge_data.get("hash") if edge_data else None
if not has_edge:
edge_deletes.append(edge)
info.edges_deleted += 1
change.edge_deletes[edge_type].append(edge)
elif edge_hash != edge.get("hash"):
js = edge_json(from_node, to_node, edge_data)
edge_updates.append(js)
info.edges_updated += 1
change.edge_updates[edge_type].append(js)

for doc in edge_cursor:
update_edge(doc)

for edge_from, edge_to, data in access.not_visited_edges(edge_type):
insert_edge(edge_from, edge_to, data)

return info, edge_inserts, edge_updates, edge_deletes
return change

async def merge_graph(
self,
Expand All @@ -1154,105 +1138,67 @@ async def merge_graph(

async def prepare_graph(
sub: GraphAccess, node_query: Tuple[str, Json], edge_query: Callable[[EdgeType], Tuple[str, Json]]
) -> Tuple[
GraphUpdate,
List[Json], # node insert
List[Json], # node update
List[Json], # node delete
Dict[EdgeType, List[Json]], # edge insert
Dict[EdgeType, List[Json]], # edge update
Dict[EdgeType, List[Json]], # edge delete
]:
graph_info = GraphUpdate()
) -> GraphChange:
graph_change = GraphChange()
# check all nodes for this subgraph
query, bind = node_query
log.debug(f"Query for nodes: {sub.root()}")
with await self.db.aql(query, bind_vars=bind, batch_size=50000) as node_cursor:
node_info, ni, nu, nd = self.prepare_nodes(sub, node_cursor, model)
graph_info += node_info
graph_change += self.prepare_nodes(sub, node_cursor, model)

# check all edges in all relevant edge-collections
edge_inserts: DefaultDict[EdgeType, List[Json]] = defaultdict(list)
edge_updates: DefaultDict[EdgeType, List[Json]] = defaultdict(list)
edge_deletes: DefaultDict[EdgeType, List[Json]] = defaultdict(list)
for edge_type in EdgeTypes.all:
query, bind = edge_query(edge_type)
log.debug(f"Query for edges of type {edge_type}: {sub.root()}")
with await self.db.aql(query, bind_vars=bind, batch_size=50000) as ec:
edge_info, gei, geu, ged = self.prepare_edges(sub, ec, edge_type)
graph_info += edge_info
edge_inserts[edge_type] = gei
edge_updates[edge_type] = geu
edge_deletes[edge_type] = ged
return graph_info, ni, nu, nd, edge_inserts, edge_updates, edge_deletes
graph_change += self.prepare_edges(sub, ec, edge_type)
return graph_change

roots, parent, graphs = GraphAccess.merge_graphs(graph_to_merge)
log.info(f"merge_graph {len(roots)} merge nodes found. change_id={change_id}, is_batch={is_batch}.")

def merge_edges(merge_node: str, merge_node_kind: str, edge_type: EdgeType) -> Tuple[str, Json]:
return self.query_update_edges(edge_type, merge_node_kind), {"update_id": merge_node}

K = TypeVar("K") # noqa: N806
V = TypeVar("V") # noqa: N806

def combine_dict(left: Dict[K, List[V]], right: Dict[K, List[V]]) -> Dict[K, List[V]]:
result = dict(left)
for key, right_values in right.items():
left_values = left.get(key)
result[key] = left_values + right_values if left_values else right_values
return result
def parent_edges(edge_type: EdgeType) -> Tuple[str, Json]:
edge_ids = [self.db_edge_key(f, t) for f, t, k in parent.g.edges(keys=True) if k.edge_type == edge_type]
return self.edges_by_ids_and_until_replace_node(edge_type, preserve_parent_structure, parent, edge_ids)

# this will throw an exception, in case of a conflicting update (--> outside try block)
log.debug("Mark all parent nodes for this update to avoid conflicting changes")
await self.mark_update(roots, list(parent.nodes), change_id, is_batch)
try:

def parent_edges(edge_type: EdgeType) -> Tuple[str, Json]:
edge_ids = [self.db_edge_key(f, t) for f, t, k in parent.g.edges(keys=True) if k.edge_type == edge_type]
return self.edges_by_ids_and_until_replace_node(edge_type, preserve_parent_structure, parent, edge_ids)

# store parent nodes and edges with a mutex to avoid conflicts
parents_nodes = self.nodes_by_ids_and_until_replace_node(preserve_parent_structure, parent)
info, nis, nus, nds, eis, eus, eds = await prepare_graph(parent, parents_nodes, parent_edges)
async with self.lock_db.lock("merge_graph_parents"):
parent_change_id = change_id + "_parent"
parent_change = await prepare_graph(parent, parents_nodes, parent_edges)
if parent_change.change_count(): # only persist in case of changes
await self._persist_update(parent_change_id, False, parent_change, update_history)

change = GraphChange()
for num, (root, graph) in enumerate(graphs):
root_kind = GraphResolver.resolved_kind(graph_to_merge.nodes[root])
if root_kind:
# noinspection PyTypeChecker
log.info(f"Update subgraph: root={root} ({root_kind}, {num+1} of {len(roots)})")
node_query = self.query_update_nodes(root_kind), {"update_id": root}
edge_query = partial(merge_edges, root, root_kind)

i, ni, nu, nd, ei, eu, ed = await prepare_graph(graph, node_query, edge_query)
info += i
nis += ni
nus += nu
nds += nd
eis = combine_dict(eis, ei)
eus = combine_dict(eus, eu)
eds = combine_dict(eds, ed)
change += await prepare_graph(graph, node_query, edge_query)
else:
# Already checked in GraphAccess - only here as safeguard.
# Already checked in GraphAccess - only here as a safeguard.
raise AttributeError(f"Kind of update root {root} is not a pre-resolved and can not be used!")

log.debug(f"Update prepared: {info}. Going to persist the changes.")
graph_update = parent_change.to_update() + change.to_update()
log.debug(f"Update prepared: {graph_update}. Going to persist the changes.")
await self._refresh_marked_update(change_id)
await self._persist_update(change_id, is_batch, nis, nus, nds, eis, eus, eds, update_history)
return roots, info
await self._persist_update(change_id, is_batch, change, update_history)
return roots, graph_update
except Exception as ex:
await self.delete_marked_update(change_id)
raise ex

async def _persist_update(
self,
change_id: str,
is_batch: bool,
resource_inserts: List[Json],
resource_updates: List[Json],
resource_deletes: List[Json],
edge_inserts: Dict[EdgeType, List[Json]],
edge_updates: Dict[EdgeType, List[Json]],
edge_deletes: Dict[EdgeType, List[Json]],
update_history: bool,
) -> None:
async def _persist_update(self, change_id: str, is_batch: bool, change: GraphChange, update_history: bool) -> None:
async def execute_many_async(
async_fn: Callable[[str, List[Json]], Any], name: str, array: List[Json], **kwargs: Any
) -> None:
Expand All @@ -1275,20 +1221,20 @@ async def trafo_many(

async def store_to_tmp_collection(temp: StandardCollection) -> None:
tmp = temp.name
ri = trafo_many(self.db.insert_many, tmp, resource_inserts, {"action": "node_created"})
ru = trafo_many(self.db.insert_many, tmp, resource_updates, {"action": "node_updated"})
rd = trafo_many(self.db.insert_many, tmp, resource_deletes, {"action": "node_deleted"})
ri = trafo_many(self.db.insert_many, tmp, change.node_inserts, {"action": "node_created"})
ru = trafo_many(self.db.insert_many, tmp, change.node_updates, {"action": "node_updated"})
rd = trafo_many(self.db.insert_many, tmp, change.node_deletes, {"action": "node_deleted"})
edge_i = [
trafo_many(self.db.insert_many, tmp, inserts, {"action": "edge_insert", "edge_type": tpe})
for tpe, inserts in edge_inserts.items()
for tpe, inserts in change.edge_inserts.items()
]
edge_u = [
trafo_many(self.db.insert_many, tmp, updates, {"action": "edge_update", "edge_type": tpe})
for tpe, updates in edge_updates.items()
for tpe, updates in change.edge_updates.items()
]
edge_d = [
trafo_many(self.db.insert_many, tmp, deletes, {"action": "edge_delete", "edge_type": tpe})
for tpe, deletes in edge_deletes.items()
for tpe, deletes in change.edge_deletes.items()
]
await asyncio.gather(*([ri, ru, rd] + edge_i + edge_u + edge_d))

Expand Down Expand Up @@ -1562,7 +1508,7 @@ async def copy_graph(self, to_graph: GraphName, to_snapshot: bool = False) -> Gr
if await self.db.has_graph(to_graph):
raise ValueError(f"Graph {to_graph} already exists")

new_graph_db = ArangoGraphDB(db=self.db, name=to_graph, adjust_node=self.node_adjuster, config=self.config)
new_graph_db = ArangoGraphDB(self.db, to_graph, self.node_adjuster, self.config, self.lock_db)

# collection creation can't be a part of a transaction so we do that first
# we simply reuse the existing create_update_schema method but do not insert any genesis data
Expand Down
Loading
Loading