From 887fafcf8ca2c3a09df7c5092022406dbb0b4ec4 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Tue, 26 Nov 2024 16:46:38 +0000 Subject: [PATCH] Fix/core save api (#172) * Acknowledge messaages from Pulsar, doh! * Change API to deliver a boolean e if value is an entity * Change loaders to use new API * Changes, entity-aware API is complete --- trustgraph-base/trustgraph/api/api.py | 35 +++++---- .../trustgraph/knowledge/__init__.py | 1 + trustgraph-base/trustgraph/knowledge/defs.py | 8 +++ .../trustgraph/knowledge/document.py | 12 +++- .../trustgraph/knowledge/organization.py | 12 +++- .../trustgraph/knowledge/publication.py | 12 +++- trustgraph-cli/scripts/tg-load-pdf | 11 ++- trustgraph-cli/scripts/tg-load-text | 11 ++- .../trustgraph/api/gateway/service.py | 72 +++++++------------ 9 files changed, 104 insertions(+), 70 deletions(-) diff --git a/trustgraph-base/trustgraph/api/api.py b/trustgraph-base/trustgraph/api/api.py index 818e42c3..7942e081 100644 --- a/trustgraph-base/trustgraph/api/api.py +++ b/trustgraph-base/trustgraph/api/api.py @@ -4,7 +4,7 @@ import dataclasses import base64 -from trustgraph.knowledge import hash +from trustgraph.knowledge import hash, Uri, Literal class ProtocolException(Exception): pass @@ -12,14 +12,6 @@ class ProtocolException(Exception): class ApplicationException(Exception): pass -class Uri(str): - def is_uri(self): return True - def is_literal(self): return False - -class Literal(str): - def is_uri(self): return False - def is_literal(self): return True - @dataclasses.dataclass class Triple: s : str @@ -213,9 +205,16 @@ def triples_query(self, s=None, p=None, o=None, limit=10000): "limit": limit } - if s: input["s"] = s - if p: input["p"] = p - if o: input["o"] = o + if not isinstance(s, Uri): + raise RuntimeError("s must be Uri") + if not isinstance(p, Uri): + raise RuntimeError("p must be Uri") + if not isinstance(o, Uri) and not isinstance(o, Literal): + raise RuntimeError("o must be Uri or Literal") + + if s: input["s"] = { "v": str(s), "e": isinstance(s, Uri), } + if p: input["p"] = { "v": str(p), "e": isinstance(p, Uri), } + if o: input["o"] = { "v": str(o), "e": isinstance(o, Uri), } url = f"{self.url}triples-query" @@ -273,9 +272,9 @@ def emit(t): if metadata: metadata.emit( lambda t: triples.append({ - "s": t.s.value, - "p": t.p.value, - "o": t.o.value + "s": { "v": t["s"], "e": isinstance(t["s"], Uri) }, + "p": { "v": t["p"], "e": isinstance(t["p"], Uri) }, + "o": { "v": t["o"], "e": isinstance(t["o"], Uri) } }) ) @@ -312,9 +311,9 @@ def load_text(self, text, id=None, metadata=None, charset="utf-8"): if metadata: metadata.emit( lambda t: triples.append({ - "s": t.s.value, - "p": t.p.value, - "o": t.o.value + "s": { "v": t["s"], "e": isinstance(t["s"], Uri) }, + "p": { "v": t["p"], "e": isinstance(t["p"], Uri) }, + "o": { "v": t["o"], "e": isinstance(t["o"], Uri) } }) ) diff --git a/trustgraph-base/trustgraph/knowledge/__init__.py b/trustgraph-base/trustgraph/knowledge/__init__.py index 0ab6b5db..8349abf0 100644 --- a/trustgraph-base/trustgraph/knowledge/__init__.py +++ b/trustgraph-base/trustgraph/knowledge/__init__.py @@ -1,4 +1,5 @@ +from . defs import * from . identifier import * from . publication import * from . document import * diff --git a/trustgraph-base/trustgraph/knowledge/defs.py b/trustgraph-base/trustgraph/knowledge/defs.py index b95863c6..d6290930 100644 --- a/trustgraph-base/trustgraph/knowledge/defs.py +++ b/trustgraph-base/trustgraph/knowledge/defs.py @@ -23,3 +23,11 @@ IDENTIFIER = 'https://schema.org/identifier' KEYWORD = 'https://schema.org/keywords' +class Uri(str): + def is_uri(self): return True + def is_literal(self): return False + +class Literal(str): + def is_uri(self): return False + def is_literal(self): return True + diff --git a/trustgraph-base/trustgraph/knowledge/document.py b/trustgraph-base/trustgraph/knowledge/document.py index dc2f43e3..99d06c72 100644 --- a/trustgraph-base/trustgraph/knowledge/document.py +++ b/trustgraph-base/trustgraph/knowledge/document.py @@ -1,6 +1,16 @@ from . defs import * -from .. schema import Triple, Value + +def Value(value, is_uri): + if is_uri: + return Uri(value) + else: + return Literal(value) + +def Triple(s, p, o): + return { + "s": s, "p": p, "o": o, + } class DigitalDocument: diff --git a/trustgraph-base/trustgraph/knowledge/organization.py b/trustgraph-base/trustgraph/knowledge/organization.py index 1129dd6c..5653aa97 100644 --- a/trustgraph-base/trustgraph/knowledge/organization.py +++ b/trustgraph-base/trustgraph/knowledge/organization.py @@ -1,6 +1,16 @@ from . defs import * -from .. schema import Triple, Value + +def Value(value, is_uri): + if is_uri: + return Uri(value) + else: + return Literal(value) + +def Triple(s, p, o): + return { + "s": s, "p": p, "o": o, + } class Organization: def __init__(self, id, name=None, description=None): diff --git a/trustgraph-base/trustgraph/knowledge/publication.py b/trustgraph-base/trustgraph/knowledge/publication.py index 3c9d41c8..d197df93 100644 --- a/trustgraph-base/trustgraph/knowledge/publication.py +++ b/trustgraph-base/trustgraph/knowledge/publication.py @@ -1,6 +1,16 @@ from . defs import * -from .. schema import Triple, Value + +def Value(value, is_uri): + if is_uri: + return Uri(value) + else: + return Literal(value) + +def Triple(s, p, o): + return { + "s": s, "p": p, "o": o, + } class PublicationEvent: def __init__( diff --git a/trustgraph-cli/scripts/tg-load-pdf b/trustgraph-cli/scripts/tg-load-pdf index 18ac57cb..0dc8ced6 100755 --- a/trustgraph-cli/scripts/tg-load-pdf +++ b/trustgraph-cli/scripts/tg-load-pdf @@ -14,7 +14,7 @@ import time import uuid from trustgraph.schema import Document, document_ingest_queue -from trustgraph.schema import Metadata +from trustgraph.schema import Metadata, Triple, Value from trustgraph.log_level import LogLevel from trustgraph.knowledge import hash, to_uri from trustgraph.knowledge import PREF_PUBEV, PREF_DOC, PREF_ORG @@ -79,7 +79,14 @@ class Loader: r = Document( metadata=Metadata( id=id, - metadata=triples, + metadata=[ + Triple( + s=Value(value=t["s"]["v"], is_uri=t["s"]["e"]), + p=Value(value=t["p"]["v"], is_uri=t["p"]["e"]), + o=Value(value=t["o"]["v"], is_uri=t["o"]["e"]) + ) + for t in triples + ], user=self.user, collection=self.collection, ), diff --git a/trustgraph-cli/scripts/tg-load-text b/trustgraph-cli/scripts/tg-load-text index e49ee7a9..6ff8d09a 100755 --- a/trustgraph-cli/scripts/tg-load-text +++ b/trustgraph-cli/scripts/tg-load-text @@ -13,7 +13,7 @@ import time import uuid from trustgraph.schema import TextDocument, text_ingest_queue -from trustgraph.schema import Metadata +from trustgraph.schema import Metadata, Triple, Value from trustgraph.log_level import LogLevel from trustgraph.knowledge import hash, to_uri from trustgraph.knowledge import PREF_PUBEV, PREF_DOC, PREF_ORG @@ -78,7 +78,14 @@ class Loader: r = TextDocument( metadata=Metadata( id=id, - metadata=triples, + metadata=[ + Triple( + s=Value(value=t["s"]["v"], is_uri=t["s"]["e"]), + p=Value(value=t["p"]["v"], is_uri=t["p"]["e"]), + o=Value(value=t["o"]["v"], is_uri=t["o"]["e"]) + ) + for t in triples + ], user=self.user, collection=self.collection, ), diff --git a/trustgraph-flow/trustgraph/api/gateway/service.py b/trustgraph-flow/trustgraph/api/gateway/service.py index 6d5f70ce..0ae01d3a 100755 --- a/trustgraph-flow/trustgraph/api/gateway/service.py +++ b/trustgraph-flow/trustgraph/api/gateway/service.py @@ -73,10 +73,7 @@ default_port = 8088 def to_value(x): - if x.startswith("http:") or x.startswith("https:"): - return Value(value=x, is_uri=True) - else: - return Value(value=x, is_uri=False) + return Value(value=x["v"], is_uri=x["e"]) def to_subgraph(x): return [ @@ -156,6 +153,9 @@ async def run(self): while True: msg = await consumer.receive() + # Acknowledge successful reception of the message + await consumer.acknowledge(msg) + try: id = msg.properties()["id"] except: @@ -192,43 +192,41 @@ async def unsubscribe_all(self, id): if id in self.full: del self.full[id] +def serialize_value(v): + return { + "v": v.value, + "e": v.is_uri, + } + +def serialize_triple(t): + return { + "s": serialize_value(t.s), + "p": serialize_value(t.p), + "o": serialize_value(t.o) + } + +def serialize_subgraph(sg): + return [ + serialize_triple(t) + for t in sg + ] + def serialize_triples(message): return { "metadata": { "id": message.metadata.id, - "metadata": [ - { - "s": t.s.value, - "p": t.p.value, - "o": t.o.value, - } - for t in message.metadata.metadata - ], + "metadata": serialize_subgraph(message.metadata.metadata), "user": message.metadata.user, "collection": message.metadata.collection, }, - "triples": [ - { - "s": t.s.value, - "p": t.p.value, - "o": t.o.value, - } - for t in message.triples - ] + "triples": serialize_subgraph(message.triples), } def serialize_graph_embeddings(message): return { "metadata": { "id": message.metadata.id, - "metadata": [ - { - "s": t.s.value, - "p": t.p.value, - "o": t.o.value, - } - for t in message.metadata.metadata - ], + "metadata": serialize_subgraph(message.metadata.metadata), "user": message.metadata.user, "collection": message.metadata.collection, }, @@ -560,23 +558,7 @@ async def triples_query(self, request): return web.json_response( { - "response": [ - { - "s": { - "v": t.s.value, - "e": t.s.is_uri, - }, - "p": { - "v": t.p.value, - "e": t.p.is_uri, - }, - "o": { - "v": t.o.value, - "e": t.o.is_uri, - } - } - for t in resp.triples - ] + "response": serialize_subgraph(resp.triples), } )