Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][fix] Traverse the graph by walking all possible paths when an edge filter is present #2197

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 35 additions & 15 deletions fixcore/fixcore/db/arango_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,36 +881,56 @@ def inout(
in_crsr: str, start: int, until: int, edge_type: str, direction: str, edge_filter: Optional[Term]
) -> str:
nonlocal query_part
in_c = ctx.next_crs("io_in")
start_c = ctx.next_crs("graph_start")
in_c = ctx.next_crs("gc")
in_edge = f"{in_c}_edge"
in_path = f"{in_c}_path"
in_r = f"{in_c}_result"
out = ctx.next_crs("io_out")
out_crsr = ctx.next_crs("io_crs")
e = ctx.next_crs("io_link")
unique = "uniqueEdges: 'path'" if with_edges else "uniqueVertices: 'global'"
dir_bound = "OUTBOUND" if direction == Direction.outbound else "INBOUND"
inout_result = (
# merge edge and vertex properties - will be split in the output transformer
f"MERGE({out_crsr}, {{_from:{e}._from, _to:{e}._to, _link_id:{e}._id, _link_reported:{e}.reported}})"
if with_edges
else out_crsr
)

# the path array contains the whole path from the start node.
# in the case of start > 0, we need to slice the array to get the correct part
def slice_or_all(in_p_part: str) -> str:
return f"SLICE({in_path}.{in_p_part}, {start})" if start > 0 else f"{in_path}.{in_p_part}"

# Edge filter: decision to include the source element is not possible while traversing it.
# When the target node is reached and edge properties are available, the decision can be made.
# In case the filter succeeds, we need to select all vertices and edges on the path.
# No filter but with_edges: another nested for loop required to return the node and edge
# No filter and no with_edges: only the node is returned
if edge_filter:
# walk the path and return all vertices (and possibly edges)
# this means intermediate nodes are returned multiple times and have to be made distinct
# since we return nodes first, the edges can always be resolved
walk_array = slice_or_all("vertices")
walk_array = f'APPEND({walk_array}, {slice_or_all("edges")})' if with_edges else walk_array
inout_result = f"FOR {in_r} in {walk_array} RETURN DISTINCT({in_r})"
elif with_edges:
# return the node and edge via a nested for loop
inout_result = f"FOR {in_r} in [{in_c}, {in_edge}] FILTER {in_r}!=null RETURN DISTINCT({in_r})"
else:
# return only the node
inout_result = f"RETURN DISTINCT {in_c}"

if outer_merge and part_idx == 0:
graph_cursor = in_crsr
outer_for = ""
else:
graph_cursor = in_c
outer_for = f"FOR {in_c} in {in_crsr} "
graph_cursor = start_c
outer_for = f"FOR {start_c} in {in_crsr} "

# optional: add the edge filter to the query
pre, fltr, post = term(e, edge_filter) if edge_filter else (None, None, None)
pre, fltr, post = term(in_edge, edge_filter) if edge_filter else (None, None, None)
pre_string = " " + pre if pre else ""
post_string = f" AND ({post})" if post else ""
filter_string = "" if not fltr and not post_string else f"{pre_string} FILTER {fltr}{post_string}"
query_part += (
f"LET {out} =({outer_for}"
# suggested by jsteemann: use crs._id instead of crs (stored in the view and more efficient)
f"FOR {out_crsr}, {e} IN {start}..{until} {dir_bound} {graph_cursor}._id "
f"`{db.edge_collection(edge_type)}` OPTIONS {{ bfs: true, {unique} }}{filter_string} "
f"RETURN DISTINCT {inout_result})"
f"FOR {in_c}, {in_edge}, {in_path} IN {start}..{until} {dir_bound} {graph_cursor}._id "
f"`{db.edge_collection(edge_type)}` OPTIONS {{ bfs: true, {unique} }}{filter_string} {inout_result})"
)
return out

Expand Down
42 changes: 14 additions & 28 deletions fixcore/fixcore/db/async_arangodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def __init__(
self.cursor_exhausted = False
self.trafo: Callable[[Json], Optional[Any]] = trafo if trafo else identity # type: ignore
self.vt_len: Optional[int] = None
self.on_hold: Optional[Json] = None
self.get_next: Callable[[], Awaitable[Optional[Json]]] = (
self.next_filtered if flatten_nodes_and_edges else self.next_element
)
Expand All @@ -62,11 +61,7 @@ async def __anext__(self) -> Any:
# if there is an on-hold element: unset and return it
# background: a graph node contains vertex and edge information.
# since this method can only return one element at a time, the edge is put on-hold for vertex+edge data.
if self.on_hold:
res = self.on_hold
self.on_hold = None
return res
elif self.cursor_exhausted:
if self.cursor_exhausted:
return await self.next_deferred_edge()
else:
try:
Expand Down Expand Up @@ -99,20 +94,10 @@ async def next_element(self) -> Optional[Json]:

async def next_filtered(self) -> Optional[Json]:
element = await self.next_from_db()
vertex: Optional[Json] = None
edge = None
try:
_key = element["_key"]
if _key not in self.visited_node:
self.visited_node.add(_key)
vertex = self.trafo(element)

from_id = element.get("_from")
to_id = element.get("_to")
link_id = element.get("_link_id")
if from_id is not None and to_id is not None and link_id is not None:
if link_id not in self.visited_edge:
self.visited_edge.add(link_id)
if (from_id := element.get("_from")) and (to_id := element.get("_to")) and (node_id := element.get("_id")):
if node_id not in self.visited_edge:
self.visited_edge.add(node_id)
if not self.vt_len:
self.vt_len = len(re.sub("/.*$", "", from_id)) + 1
edge = {
Expand All @@ -122,21 +107,22 @@ async def next_filtered(self) -> Optional[Json]:
# example: vertex_name/node_id -> node_id
"to": to_id[self.vt_len :], # noqa: E203
# example: vertex_name_default/edge_id -> default
"edge_type": re.sub("/.*$", "", link_id[self.vt_len :]), # noqa: E203
"edge_type": re.sub("/.*$", "", node_id[self.vt_len :]), # noqa: E203
}
if reported := element.get("_link_reported"):
if reported := element.get("reported"):
edge["reported"] = reported
# make sure that both nodes of the edge have been visited already
if from_id not in self.visited_node or to_id not in self.visited_node:
self.deferred_edges.append(edge)
edge = None
# if the vertex is not returned: return the edge
# otherwise return the vertex and remember the edge
if vertex:
self.on_hold = edge
return vertex
return None
else:
return edge
elif key := element.get("_key"):
if key not in self.visited_node:
self.visited_node.add(key)
return self.trafo(element)
else:
return edge
return element
except Exception as ex:
log.warning(f"Could not read element {element}: {ex}. Ignore.")
return None
Expand Down
2 changes: 1 addition & 1 deletion fixcore/tests/fixcore/cli/command_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ async def test_search_source(cli: CLIService) -> None:
assert len(result3[0]) == 3

result4 = await cli.execute_cli_command("search --explain --with-edges is(graph_root) -[0:1]->", list_sink)
assert result4[0][0]["rating"] in ["simple", "complex"]
assert result4[0][0]["rating"] in ["simple", "bad", "complex"]

# use absolute path syntax
result5 = await cli.execute_cli_command(
Expand Down
Loading