Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][fix] Merge edge and vertex and unfold in code #2217

Merged
merged 3 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions fixcore/fixcore/db/arango_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@

allowed_first_merge_part = Part(AllTerm())
unset_props = json.dumps(["flat"])
edge_unset_props = json.dumps(["_rev", "hash", "refs"])
# This list of delimiter is also used in the arango delimiter index.
# In case the definition is changed, also the index needs to change!
fulltext_delimiter = [" ", "_", "-", "@", ":", "/", "."]
Expand Down Expand Up @@ -890,26 +891,27 @@ def inout(
unique = "uniqueEdges: 'path'" if with_edges else "uniqueVertices: 'global'"
dir_bound = "OUTBOUND" if direction == Direction.outbound else "INBOUND"

# the path array contains the whole path from the start node.
# in the case of start > 0, we need to slice the array to get the correct part
def slice_or_all(in_p_part: str) -> str:
return f"SLICE({in_path}.{in_p_part}, {start})" if start > 0 else f"{in_path}.{in_p_part}"

# Edge filter: decision to include the source element is not possible while traversing it.
# Edge filter: the decision to include the source element is not possible while traversing it.
# When the target node is reached and edge properties are available, the decision can be made.
# In case the filter succeeds, we need to select all vertices and edges on the path.
# No filter but with_edges: another nested for loop required to return the node and edge
# No filter and no with_edges: only the node is returned
# No filter but with_edges: merge the edge into the vertex
# No filter and not with_edges: only the node is returned
if edge_filter:
# walk the path and return all vertices (and possibly edges)
# walk the path and return all/sliced vertices.
# this means intermediate nodes are returned multiple times and have to be made distinct
# since we return nodes first, the edges can always be resolved
walk_array = slice_or_all("vertices")
walk_array = f'APPEND({walk_array}, {slice_or_all("edges")})' if with_edges else walk_array
inout_result = f"FOR {in_r} in {walk_array} RETURN DISTINCT({in_r})"
if with_edges:
pv = f"{in_path}.vertices[{in_r}]"
pe = f"{in_path}.edges[{in_r}]"
pv_with_pe = f"MERGE({pv}, {{_edge:UNSET({pe}, {edge_unset_props})}})"
inout_result = (
f"FOR {in_r} in {start}..LENGTH({in_path}.vertices)-1 "
f"RETURN DISTINCT({pe}!=null ? {pv_with_pe} : {pv})"
)
else:
slice_or_all = f"SLICE({in_path}.vertices, {start})" if start > 0 else f"{in_path}.vertices"
inout_result = f"FOR {in_r} in {slice_or_all} RETURN DISTINCT({in_r})"
elif with_edges:
# return the node and edge via a nested for loop
inout_result = f"FOR {in_r} in [{in_c}, {in_edge}] FILTER {in_r}!=null RETURN DISTINCT({in_r})"
inout_result = f"RETURN DISTINCT(MERGE({in_c}, {{_edge:UNSET({in_edge}, {edge_unset_props})}}))"
else:
# return only the node
inout_result = f"RETURN DISTINCT {in_c}"
Expand Down
66 changes: 40 additions & 26 deletions fixcore/fixcore/db/async_arangodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(
self.cursor_exhausted = False
self.trafo: Callable[[Json], Optional[Any]] = trafo if trafo else identity # type: ignore
self.vt_len: Optional[int] = None
self.on_hold: Optional[Json] = None
self.get_next: Callable[[], Awaitable[Optional[Json]]] = (
self.next_filtered if flatten_nodes_and_edges else self.next_element
)
Expand All @@ -61,7 +62,11 @@ async def __anext__(self) -> Any:
# if there is an on-hold element: unset and return it
# background: a graph node contains vertex and edge information.
# since this method can only return one element at a time, the edge is put on-hold for vertex+edge data.
if self.cursor_exhausted:
if self.on_hold:
res = self.on_hold
self.on_hold = None
return res
elif self.cursor_exhausted:
return await self.next_deferred_edge()
else:
try:
Expand Down Expand Up @@ -94,35 +99,44 @@ async def next_element(self) -> Optional[Json]:

async def next_filtered(self) -> Optional[Json]:
element = await self.next_from_db()
vertex: Optional[Json] = None
edge: Optional[Json] = None
try:
if (from_id := element.get("_from")) and (to_id := element.get("_to")) and (node_id := element.get("_id")):
if node_id not in self.visited_edge:
self.visited_edge.add(node_id)
if not self.vt_len:
self.vt_len = len(re.sub("/.*$", "", from_id)) + 1
edge = {
"type": "edge",
# example: vertex_name/node_id -> node_id
"from": from_id[self.vt_len :], # noqa: E203
# example: vertex_name/node_id -> node_id
"to": to_id[self.vt_len :], # noqa: E203
# example: vertex_name_default/edge_id -> default
"edge_type": re.sub("/.*$", "", node_id[self.vt_len :]), # noqa: E203
}
if reported := element.get("reported"):
edge["reported"] = reported
# make sure that both nodes of the edge have been visited already
if from_id not in self.visited_node or to_id not in self.visited_node:
self.deferred_edges.append(edge)
return None
else:
return edge
elif key := element.get("_key"):
if ep := element.get("_edge"):
if (from_id := ep.get("_from")) and (to_id := ep.get("_to")) and (node_id := ep.get("_id")):
if node_id not in self.visited_edge:
self.visited_edge.add(node_id)
if not self.vt_len:
self.vt_len = len(re.sub("/.*$", "", from_id)) + 1
edge = {
"type": "edge",
# example: vertex_name/node_id -> node_id
"from": from_id[self.vt_len :], # noqa: E203
# example: vertex_name/node_id -> node_id
"to": to_id[self.vt_len :], # noqa: E203
# example: vertex_name_default/edge_id -> default
"edge_type": re.sub("/.*$", "", node_id[self.vt_len :]), # noqa: E203
}
if reported := ep.get("reported"):
edge["reported"] = reported
# make sure that both nodes of the edge have been visited already
if from_id not in self.visited_node or to_id not in self.visited_node:
self.deferred_edges.append(edge)
edge = None
if key := element.get("_key"):
if key not in self.visited_node:
self.visited_node.add(key)
return self.trafo(element)
vertex = self.trafo(element)
else:
vertex = element
# if the vertex is not returned: return the edge
# otherwise return the vertex and remember the edge
if vertex:
self.on_hold = edge
return vertex
else:
return element
return edge

except Exception as ex:
log.warning(f"Could not read element {element}: {ex}. Ignore.")
return None
Expand Down
2 changes: 2 additions & 0 deletions fixcore/tests/fixcore/db/graphdb_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,8 @@ async def assert_result(query: str, nodes: int, edges: int) -> None:
await assert_result("is(foo) and reported.id==9 <-delete[0:]-", 11, 10)
await assert_result("is(foo) and reported.id==9 <-default[0:]-", 4, 3)
await assert_result("is(foo) and reported.id==9 -delete[0:]->", 1, 0)
await assert_result("is(foo) and reported.id==9 -[0:]-> is(foo, bla)", 11, 10)
await assert_result("is(foo) and reported.id==9 -[0:]{not_existent==null}-> is(foo, bla)", 11, 10)


@mark.asyncio
Expand Down
Loading