Skip to content

Commit

Permalink
[core][fix] Traverse the graph by walking all possible paths when an …
Browse files Browse the repository at this point in the history
…edge filter is present (#2197)
  • Loading branch information
aquamatthias authored Sep 23, 2024
1 parent 1735b14 commit 61e97ea
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 44 deletions.
50 changes: 35 additions & 15 deletions fixcore/fixcore/db/arango_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,36 +881,56 @@ def inout(
in_crsr: str, start: int, until: int, edge_type: str, direction: str, edge_filter: Optional[Term]
) -> str:
nonlocal query_part
in_c = ctx.next_crs("io_in")
start_c = ctx.next_crs("graph_start")
in_c = ctx.next_crs("gc")
in_edge = f"{in_c}_edge"
in_path = f"{in_c}_path"
in_r = f"{in_c}_result"
out = ctx.next_crs("io_out")
out_crsr = ctx.next_crs("io_crs")
e = ctx.next_crs("io_link")
unique = "uniqueEdges: 'path'" if with_edges else "uniqueVertices: 'global'"
dir_bound = "OUTBOUND" if direction == Direction.outbound else "INBOUND"
inout_result = (
# merge edge and vertex properties - will be split in the output transformer
f"MERGE({out_crsr}, {{_from:{e}._from, _to:{e}._to, _link_id:{e}._id, _link_reported:{e}.reported}})"
if with_edges
else out_crsr
)

# the path array contains the whole path from the start node.
# in the case of start > 0, we need to slice the array to get the correct part
def slice_or_all(in_p_part: str) -> str:
return f"SLICE({in_path}.{in_p_part}, {start})" if start > 0 else f"{in_path}.{in_p_part}"

# Edge filter: decision to include the source element is not possible while traversing it.
# When the target node is reached and edge properties are available, the decision can be made.
# In case the filter succeeds, we need to select all vertices and edges on the path.
# No filter but with_edges: another nested for loop required to return the node and edge
# No filter and no with_edges: only the node is returned
if edge_filter:
# walk the path and return all vertices (and possibly edges)
# this means intermediate nodes are returned multiple times and have to be made distinct
# since we return nodes first, the edges can always be resolved
walk_array = slice_or_all("vertices")
walk_array = f'APPEND({walk_array}, {slice_or_all("edges")})' if with_edges else walk_array
inout_result = f"FOR {in_r} in {walk_array} RETURN DISTINCT({in_r})"
elif with_edges:
# return the node and edge via a nested for loop
inout_result = f"FOR {in_r} in [{in_c}, {in_edge}] FILTER {in_r}!=null RETURN DISTINCT({in_r})"
else:
# return only the node
inout_result = f"RETURN DISTINCT {in_c}"

if outer_merge and part_idx == 0:
graph_cursor = in_crsr
outer_for = ""
else:
graph_cursor = in_c
outer_for = f"FOR {in_c} in {in_crsr} "
graph_cursor = start_c
outer_for = f"FOR {start_c} in {in_crsr} "

# optional: add the edge filter to the query
pre, fltr, post = term(e, edge_filter) if edge_filter else (None, None, None)
pre, fltr, post = term(in_edge, edge_filter) if edge_filter else (None, None, None)
pre_string = " " + pre if pre else ""
post_string = f" AND ({post})" if post else ""
filter_string = "" if not fltr and not post_string else f"{pre_string} FILTER {fltr}{post_string}"
query_part += (
f"LET {out} =({outer_for}"
# suggested by jsteemann: use crs._id instead of crs (stored in the view and more efficient)
f"FOR {out_crsr}, {e} IN {start}..{until} {dir_bound} {graph_cursor}._id "
f"`{db.edge_collection(edge_type)}` OPTIONS {{ bfs: true, {unique} }}{filter_string} "
f"RETURN DISTINCT {inout_result})"
f"FOR {in_c}, {in_edge}, {in_path} IN {start}..{until} {dir_bound} {graph_cursor}._id "
f"`{db.edge_collection(edge_type)}` OPTIONS {{ bfs: true, {unique} }}{filter_string} {inout_result})"
)
return out

Expand Down
42 changes: 14 additions & 28 deletions fixcore/fixcore/db/async_arangodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def __init__(
self.cursor_exhausted = False
self.trafo: Callable[[Json], Optional[Any]] = trafo if trafo else identity # type: ignore
self.vt_len: Optional[int] = None
self.on_hold: Optional[Json] = None
self.get_next: Callable[[], Awaitable[Optional[Json]]] = (
self.next_filtered if flatten_nodes_and_edges else self.next_element
)
Expand All @@ -62,11 +61,7 @@ async def __anext__(self) -> Any:
# if there is an on-hold element: unset and return it
# background: a graph node contains vertex and edge information.
# since this method can only return one element at a time, the edge is put on-hold for vertex+edge data.
if self.on_hold:
res = self.on_hold
self.on_hold = None
return res
elif self.cursor_exhausted:
if self.cursor_exhausted:
return await self.next_deferred_edge()
else:
try:
Expand Down Expand Up @@ -99,20 +94,10 @@ async def next_element(self) -> Optional[Json]:

async def next_filtered(self) -> Optional[Json]:
element = await self.next_from_db()
vertex: Optional[Json] = None
edge = None
try:
_key = element["_key"]
if _key not in self.visited_node:
self.visited_node.add(_key)
vertex = self.trafo(element)

from_id = element.get("_from")
to_id = element.get("_to")
link_id = element.get("_link_id")
if from_id is not None and to_id is not None and link_id is not None:
if link_id not in self.visited_edge:
self.visited_edge.add(link_id)
if (from_id := element.get("_from")) and (to_id := element.get("_to")) and (node_id := element.get("_id")):
if node_id not in self.visited_edge:
self.visited_edge.add(node_id)
if not self.vt_len:
self.vt_len = len(re.sub("/.*$", "", from_id)) + 1
edge = {
Expand All @@ -122,21 +107,22 @@ async def next_filtered(self) -> Optional[Json]:
# example: vertex_name/node_id -> node_id
"to": to_id[self.vt_len :], # noqa: E203
# example: vertex_name_default/edge_id -> default
"edge_type": re.sub("/.*$", "", link_id[self.vt_len :]), # noqa: E203
"edge_type": re.sub("/.*$", "", node_id[self.vt_len :]), # noqa: E203
}
if reported := element.get("_link_reported"):
if reported := element.get("reported"):
edge["reported"] = reported
# make sure that both nodes of the edge have been visited already
if from_id not in self.visited_node or to_id not in self.visited_node:
self.deferred_edges.append(edge)
edge = None
# if the vertex is not returned: return the edge
# otherwise return the vertex and remember the edge
if vertex:
self.on_hold = edge
return vertex
return None
else:
return edge
elif key := element.get("_key"):
if key not in self.visited_node:
self.visited_node.add(key)
return self.trafo(element)
else:
return edge
return element
except Exception as ex:
log.warning(f"Could not read element {element}: {ex}. Ignore.")
return None
Expand Down
2 changes: 1 addition & 1 deletion fixcore/tests/fixcore/cli/command_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ async def test_search_source(cli: CLIService) -> None:
assert len(result3[0]) == 3

result4 = await cli.execute_cli_command("search --explain --with-edges is(graph_root) -[0:1]->", list_sink)
assert result4[0][0]["rating"] in ["simple", "complex"]
assert result4[0][0]["rating"] in ["simple", "bad", "complex"]

# use absolute path syntax
result5 = await cli.execute_cli_command(
Expand Down

0 comments on commit 61e97ea

Please sign in to comment.