Skip to content

Commit

Permalink
[core][feat] Allow edge properties (#2182)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias authored Sep 10, 2024
1 parent f11d871 commit f7d3dac
Show file tree
Hide file tree
Showing 22 changed files with 350 additions and 230 deletions.
34 changes: 18 additions & 16 deletions fixcore/fixcore/action_handlers/merge_deferred_edge_handler.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
from attr import frozen

from fixcore.db.model import QueryModel
from fixcore.message_bus import MessageBus, Action
import logging
import asyncio
import logging
from asyncio import Task, Future
from typing import Optional, Tuple, List
from collections import defaultdict
from contextlib import suppress
from datetime import timedelta
from fixcore.model.graph_access import ByNodeId, NodeSelector
from fixcore.service import Service
from fixcore.task.model import Subscriber
from typing import Optional, Tuple, List, Dict

from attr import frozen

from fixcore.db.db_access import DbAccess
from fixcore.db.model import QueryModel
from fixcore.ids import NodeId, SubscriberId
from fixcore.task.task_handler import TaskHandlerService
from fixcore.ids import TaskId
from fixcore.task.subscribers import SubscriptionHandler
from fixcore.db.db_access import DbAccess
from fixcore.message_bus import MessageBus, Action
from fixcore.model.graph_access import ByNodeId, NodeSelector, DeferredEdge
from fixcore.model.model_handler import ModelHandler
from fixcore.query.query_parser import parse_query

from fixcore.service import Service
from fixcore.task.model import Subscriber
from fixcore.task.subscribers import SubscriptionHandler
from fixcore.task.task_handler import TaskHandlerService
from fixcore.types import EdgeType

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -83,21 +85,21 @@ async def find_node_id(selector: NodeSelector) -> Optional[NodeId]:
log.warning(f"task_id: {task_id}: Error {e} when finding node {selector}")
return None

edges: List[Tuple[NodeId, NodeId, str]] = []
edges: Dict[EdgeType, List[Tuple[NodeId, NodeId, DeferredEdge]]] = defaultdict(list)
for pending_edge in pending_edges:
for edge in pending_edge.edges:
from_id = await find_node_id(edge.from_node)
to_id = await find_node_id(edge.to_node)
processed += 1
if from_id and to_id:
edges.append((from_id, to_id, edge.edge_type))
edges[edge.edge_type].append((from_id, to_id, edge))

# apply edges in graph
updated, deleted = await graph_db.update_deferred_edges(edges, first.created_at)
# delete processed edge definitions
for task_id in task_ids:
await deferred_outer_edge_db.delete_for_task(task_id)
log.info(f"DeferredEdges: {len(edges)} edges: {updated} updated, {deleted} deleted. ({task_ids})")
log.info(f"DeferredEdges: {processed} edges: {updated} updated, {deleted} deleted. ({task_ids})")
return DeferredMergeResult(processed, updated, deleted)
else:
log.info(f"MergeOuterEdgesHandler: no pending edges found. ({task_ids})")
Expand Down
3 changes: 2 additions & 1 deletion fixcore/fixcore/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -2781,7 +2781,8 @@ def fmt_json(elem: Json) -> JsonElement:
first = False
return result
elif is_edge(elem):
return f'{elem.get("from")} -> {elem.get("to")}: {elem.get("edge_type")}'
ers = to_str("", er) if isinstance(er := elem.get("reported"), dict) else ""
return f'{elem.get("from")} -{elem.get("edge_type")}-> {elem.get("to")}: {ers}'
else:
return elem

Expand Down
7 changes: 4 additions & 3 deletions fixcore/fixcore/db/arango_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,12 +882,13 @@ def inout(in_crsr: str, start: int, until: int, edge_type: str, direction: str)
in_c = ctx.next_crs("io_in")
out = ctx.next_crs("io_out")
out_crsr = ctx.next_crs("io_crs")
link = ctx.next_crs("io_link")
e = ctx.next_crs("io_link")
unique = "uniqueEdges: 'path'" if with_edges else "uniqueVertices: 'global'"
link_str = f", {link}" if with_edges else ""
link_str = f", {e}" if with_edges else ""
dir_bound = "OUTBOUND" if direction == Direction.outbound else "INBOUND"
inout_result = (
f"MERGE({out_crsr}, {{_from:{link}._from, _to:{link}._to, _link_id:{link}._id}})"
# merge edge and vertex properties - will be split in the output transformer
f"MERGE({out_crsr}, {{_from:{e}._from, _to:{e}._to, _link_id:{e}._id, _link_reported:{e}.reported}})"
if with_edges
else out_crsr
)
Expand Down
4 changes: 3 additions & 1 deletion fixcore/fixcore/db/async_arangodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@

from fixcore.async_extensions import run_async
from fixcore.error import QueryTookToLongError
from fixcore.util import identity
from fixcore.ids import GraphName
from fixcore.util import identity

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -124,6 +124,8 @@ async def next_filtered(self) -> Optional[Json]:
# example: vertex_name_default/edge_id -> default
"edge_type": re.sub("/.*$", "", link_id[self.vt_len :]), # noqa: E203
}
if reported := element.get("_link_reported"):
edge["reported"] = reported
# make sure that both nodes of the edge have been visited already
if from_id not in self.visited_node or to_id not in self.visited_node:
self.deferred_edges.append(edge)
Expand Down
2 changes: 1 addition & 1 deletion fixcore/fixcore/db/db_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from fixcore.db.arangodb_extensions import ArangoHTTPClient
from fixcore.db.async_arangodb import AsyncArangoDB, AsyncCursor
from fixcore.db.configdb import config_entity_db, config_validation_entity_db
from fixcore.db.deferredouteredgedb import deferred_outer_edge_db
from fixcore.db.deferrededgesdb import deferred_outer_edge_db
from fixcore.db.entitydb import EventEntityDb
from fixcore.db.graphdb import ArangoGraphDB, GraphDB, EventGraphDB
from fixcore.db.jobdb import job_db
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
from typing import List, cast
import logging

from fixcore.model.typed_model import from_js
from fixcore.types import Json
from fixcore.ids import GraphName
from fixlib.json import from_json


@define
class DeferredOuterEdges:
class DeferredEdges:
id: str
change_id: str
task_id: TaskId
Expand All @@ -28,14 +28,14 @@ class DeferredOuterEdges:
log = logging.getLogger(__name__)


class DeferredOuterEdgeDb(ArangoEntityDb[str, DeferredOuterEdges]):
async def all_for_task(self, task_id: TaskId) -> List[DeferredOuterEdges]:
class DeferredEdgesDb(ArangoEntityDb[str, DeferredEdges]):
async def all_for_task(self, task_id: TaskId) -> List[DeferredEdges]:
result = []
async with await self.db.aql_cursor(
f"FOR e IN `{self.collection_name}` FILTER e.task_id == @task_id RETURN e", bind_vars={"task_id": task_id}
) as cursor:
async for doc in cursor:
edges = from_js(doc, DeferredOuterEdges)
edges = from_json(doc, DeferredEdges)
result.append(edges)
return result

Expand All @@ -53,8 +53,10 @@ async def create_update_schema(self) -> None:
collection = self.db.collection(self.collection_name)
if ttl_index_name not in {idx["name"] for idx in cast(List[Json], collection.indexes())}:
log.info(f"Add index {ttl_index_name} on {collection.name}")
collection.add_ttl_index(["created_at"], TWO_HOURS, "deferred_edges_expiration_index")
collection.add_index(
dict(type="ttl", fields=["created_at"], expireAfter=TWO_HOURS, name="deferred_edges_expiration_index")
)


def deferred_outer_edge_db(db: AsyncArangoDB, collection: str) -> DeferredOuterEdgeDb:
return DeferredOuterEdgeDb(db, collection, DeferredOuterEdges, lambda k: k.id)
def deferred_outer_edge_db(db: AsyncArangoDB, collection: str) -> DeferredEdgesDb:
return DeferredEdgesDb(db, collection, DeferredEdges, lambda k: k.id)
Loading

0 comments on commit f7d3dac

Please sign in to comment.