From 5883e925d0aa2ab542e3b98487051198ede5608e Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Tue, 26 Mar 2024 11:26:15 -0300 Subject: [PATCH 01/14] create cache manager --- hyperon_das/cache.py | 36 ++++++++- hyperon_das/client.py | 23 +++++- hyperon_das/das.py | 49 +++++++++++- hyperon_das/decorators.py | 5 +- hyperon_das/query_engines.py | 146 ++++++++++++++--------------------- hyperon_das/utils.py | 76 +++++++++++++++++- 6 files changed, 237 insertions(+), 98 deletions(-) diff --git a/hyperon_das/cache.py b/hyperon_das/cache.py index 38bec799..12171690 100644 --- a/hyperon_das/cache.py +++ b/hyperon_das/cache.py @@ -2,12 +2,42 @@ from collections import deque from itertools import product from threading import Semaphore, Thread -from typing import Any, Dict, List, Optional, Union - +from typing import Any, Dict, List, Optional, TypeVar, Union from hyperon_das_atomdb import WILDCARD - +from hyperon_das_atomdb.adapters import InMemoryDB, RedisMongoDB +from hyperon_das.client import FunctionsClient from hyperon_das.utils import Assignment, QueryAnswer +AdapterDBType = TypeVar("AdapterDBType", RedisMongoDB, InMemoryDB) + + +class CacheManager: + def __init__(self, cache: AdapterDBType, **kwargs): + self.cache = cache + + def fetch( + self, + host: Optional[str] = None, + port: Optional[int] = None, + query: Optional[Union[List[dict], dict]] = None, + **kwargs + ) -> bool: + try: + if not (server := kwargs.pop('server', None)): + server = FunctionsClient(host, port) + + documents = server.fetch(query=query, **kwargs) + + self._populate_cache(documents) + + return True + except Exception as e: + return False + + def _populate_cache(self, documents: List[Dict[str, Any]]) -> None: + [self.cache.add_link(document) for document in documents] + self.cache.commit() + class QueryAnswerIterator(ABC): def __init__(self, source: Any): diff --git a/hyperon_das/client.py b/hyperon_das/client.py index 2ab6fc8c..eb8749d1 100644 --- a/hyperon_das/client.py +++ b/hyperon_das/client.py @@ -7,14 +7,18 @@ from hyperon_das.exceptions import ConnectionError, HTTPError, RequestError, TimeoutError from hyperon_das.logger import logger -from hyperon_das.utils import deserialize, serialize +from hyperon_das.utils import connect_to_server, deserialize, serialize class FunctionsClient: - def __init__(self, url: str, server_count: int = 0, name: Optional[str] = None): + def __init__(self, host: str, port: int, server_count: int = 0, name: Optional[str] = None): + if not host: + raise ValueError('Host is required') + + self.url = connect_to_server(host, port) + if not name: self.name = f'server-{server_count}' - self.url = url def _send_request(self, payload) -> Any: try: @@ -177,3 +181,16 @@ def custom_query(self, index_id: str, **kwargs) -> List[Dict[str, Any]]: 'input': {'index_id': index_id, 'kwargs': kwargs}, } return self._send_request(payload) + + def fetch( + self, + query: Union[List[dict], dict], + host: Optional[str] = None, + port: Optional[int] = None, + **kwargs + ) -> bool: + payload = { + 'action': 'fetch', + 'input': {'query': query, 'host': host, 'port': port, 'kwargs': kwargs}, + } + return self._send_request(payload) diff --git a/hyperon_das/das.py b/hyperon_das/das.py index eb41b81a..338dac48 100644 --- a/hyperon_das/das.py +++ b/hyperon_das/das.py @@ -4,7 +4,7 @@ from hyperon_das_atomdb.adapters import InMemoryDB, RedisMongoDB from hyperon_das_atomdb.exceptions import InvalidAtomDB -from hyperon_das.cache import QueryAnswerIterator +from hyperon_das.cache import CacheManager, QueryAnswerIterator from hyperon_das.exceptions import ( GetTraversalCursorException, InvalidDASParameters, @@ -32,6 +32,8 @@ def __init__(self, **kwargs: Optional[Dict[str, Any]]) -> None: else: raise InvalidAtomDB(message="Invalid AtomDB type. Choose either 'ram' or 'redis_mongo'") + kwargs.update({'cache_manager': CacheManager(self.backend)}) + if query_engine_parameter == 'local': self.query_engine = LocalQueryEngine(self.backend, kwargs) logger().info('Initialized local Das') @@ -575,3 +577,48 @@ def create_field_index( return self.query_engine.create_field_index( atom_type, field, type=type, composite_type=composite_type ) + + def fetch( + self, + query: Union[List[dict], dict], + host: Optional[str] = None, + port: Optional[int] = None, + **kwargs + ) -> bool: + is_remote_das = isinstance(self.query_engine, RemoteQueryEngine) + + # if not is_remote_das and not host and not port: + # raise ValueError("The 'host' and 'port' parameters must be sent to DAS local") + + return self.query_engine.fetch(host, port, query, **kwargs) + + +if __name__ == '__main__': + remote_das_host = "45.63.85.59" + remote_das_port = 8080 + + das1 = DistributedAtomSpace(query_engine='remote', host=remote_das_host, port=remote_das_port) + responses1 = das1.query( + query={ + "atom_type": "link", + "type": "Expression", + "targets": [ + {"atom_type": "node", "type": "Symbol", "name": "Similarity"}, + {"atom_type": "node", "type": "Symbol", "name": '"human"'}, + {"atom_type": "variable", "name": "v1"} + ] + } + ) + + das2 = DistributedAtomSpace(atomdb='ram', query_engine='local') + response2 = das2.fetch( + query={ + "atom_type": "link", + "type": "Expression", + "targets": [ + {"atom_type": "node", "type": "Symbol", "name": "Similarity"}, + {"atom_type": "node", "type": "Symbol", "name": '"human"'}, + {"atom_type": "variable", "name": "v1"} + ] + } + ) \ No newline at end of file diff --git a/hyperon_das/decorators.py b/hyperon_das/decorators.py index 1804668f..549d38e2 100644 --- a/hyperon_das/decorators.py +++ b/hyperon_das/decorators.py @@ -1,5 +1,6 @@ import time from functools import wraps +from http import HTTPStatus # noqa: F401 from typing import Callable from hyperon_das.exceptions import ConnectionError @@ -17,9 +18,9 @@ def wrapper(*args, **kwargs): while retry_count < attempts and timer_count < timeout_seconds: try: start_time = time.time() - response = function(*args, **kwargs) + status, response = function(*args, **kwargs) end_time = time.time() - if response is not None: + if status == HTTPStatus.OK: logger().debug( f'{retry_count + 1} successful connection attempt at [host={args[1]}]' ) diff --git a/hyperon_das/query_engines.py b/hyperon_das/query_engines.py index bd9efa19..f4db16f0 100644 --- a/hyperon_das/query_engines.py +++ b/hyperon_das/query_engines.py @@ -1,18 +1,7 @@ -import json # noqa: F401 from abc import ABC, abstractmethod -from http import HTTPStatus # noqa: F401 from typing import Any, Dict, Iterator, List, Optional, Set, Tuple, Union - from hyperon_das_atomdb import WILDCARD from hyperon_das_atomdb.exceptions import AtomDoesNotExist, LinkDoesNotExist, NodeDoesNotExist -from requests import sessions -from requests.exceptions import ( # noqa: F401 - ConnectionError, - HTTPError, - JSONDecodeError, - RequestException, - Timeout, -) from hyperon_das.cache import ( AndEvaluator, @@ -24,16 +13,16 @@ QueryAnswerIterator, RemoteGetLinks, RemoteIncomingLinks, + CacheManager ) from hyperon_das.client import FunctionsClient -from hyperon_das.decorators import retry from hyperon_das.exceptions import ( InvalidDASParameters, QueryParametersException, UnexpectedQueryFormat, ) from hyperon_das.logger import logger -from hyperon_das.utils import Assignment, QueryAnswer, get_package_version, serialize # noqa: F401 +from hyperon_das.utils import Assignment, QueryAnswer class QueryEngine(ABC): @@ -88,10 +77,21 @@ def create_field_index( composite_type: Optional[List[Any]] = None, ) -> str: ... # pragma no cover + + @abstractmethod + def fetch( + self, + query: Union[List[dict], dict], + host: Optional[str] = None, + port: Optional[int] = None, + **kwargs + ) -> bool: + ... # pragma no cover class LocalQueryEngine(QueryEngine): def __init__(self, backend, kwargs: Optional[dict] = None) -> None: + self.cache_manager = kwargs.get('cache_manager') self.local_backend = backend def _error(self, exception: Exception): @@ -295,85 +295,44 @@ def create_field_index( ) -> str: return self.local_backend.create_field_index(atom_type, field, type, composite_type) + def fetch( + self, + query: Union[List[dict], dict], + host: Optional[str] = None, + port: Optional[int] = None, + **kwargs + ) -> bool: + if query['atom_type'] == 'node': + handle = self.local_backend.get_node_handle(query["type"], query["name"]) + return self.local_backend.get_atom(handle) + elif query['atom_type'] == 'link': + matched_targets = [] + for target in query["targets"]: + if target["atom_type"] == "node" or target["atom_type"] == "link": + if matched := self.fetch(target, **kwargs): + matched_targets.append(matched) + elif target["atom_type"] == "variable": + matched_targets.append('*') + else: + self._error( + UnexpectedQueryFormat( + message="Query processing reached an unexpected state", + details=f'link: {str(query)} link target: {str(query)}', + ) + ) + else: + raise ValueError('Invalid atom type') + class RemoteQueryEngine(QueryEngine): def __init__(self, backend, kwargs): + self.cache_manager: CacheManager = kwargs.get('cache_manager') self.local_query_engine = LocalQueryEngine(backend, kwargs) - host = kwargs.get('host') - port = kwargs.get('port') - if not host: + self.host = kwargs.get('host') + self.port = kwargs.get('port') + if not self.host: raise InvalidDASParameters(message='Send `host` parameter to connect in a remote DAS') - url = self._connect_server(host, port) - self.remote_das = FunctionsClient(url) - - @retry(attempts=5, timeout_seconds=120) - def _connect_server(self, host: str, port: Optional[str] = None): - port = port or '8081' - openfaas_uri = f'http://{host}:{port}/function/query-engine' - aws_lambda_uri = f'http://{host}/prod/query-engine' - url = None - if self._is_server_connect(openfaas_uri): - url = openfaas_uri - elif self._is_server_connect(aws_lambda_uri): - url = aws_lambda_uri - return url - - # TODO: Use this method when version checking is running on the server - # def _is_server_connect(self, url: str) -> bool: - # logger().debug(f'connecting to remote Das {url}') - # das_version = get_package_version('hyperon_das') - - # try: - # with sessions.Session() as session: - # response = session.request( - # method='POST', - # url=url, - # data=json.dumps( - # { - # 'action': 'handshake', - # 'input': { - # 'das_version': das_version, - # 'atomdb_version': get_package_version('hyperon_das_atomdb'), - # }, - # } - # ), - # timeout=10, - # ) - # if response.status_code == HTTPStatus.CONFLICT: - # try: - # remote_das_version = response.json().get('das').get('version') - # except JSONDecodeError as e: - # raise Exception(str(e)) - # logger().error( - # f'Package version conflict error when connecting to remote DAS - Local DAS: `{das_version}` - Remote DAS: `{remote_das_version}`' - # ) - # raise Exception( - # f'The version sent by the local DAS is {das_version}, but the expected version on the server is {remote_das_version}' - # ) - # elif response.status_code == HTTPStatus.OK: - # return True - # else: - # response.raise_for_status() - # return False - # except (ConnectionError, Timeout, HTTPError, RequestException): - # return False - - def _is_server_connect(self, url: str) -> bool: - logger().debug(f'connecting to remote Das {url}') - try: - with sessions.Session() as session: - response = session.request( - method='POST', - url=url, - data=serialize({"action": "ping", "input": {}}), - headers={'Content-Type': 'application/octet-stream'}, - timeout=10, - ) - except Exception: - return False - if response.status_code == 200: - return True - return False + self.remote_das = FunctionsClient(self.host, self.port) def get_atom(self, handle: str, **kwargs) -> Dict[str, Any]: try: @@ -500,3 +459,16 @@ def create_field_index( composite_type: Optional[List[Any]] = None, ) -> str: return self.remote_das.create_field_index(atom_type, field, type, composite_type) + + def fetch( + self, + query: Union[List[dict], dict], + host: Optional[str] = None, + port: Optional[int] = None, + **kwargs + ) -> bool: + if not host and not port: + host = self.query_engine.host + port = self.query_engine.port + + return self.cache_manager.fetch(query=query, server=self.remote_das) \ No newline at end of file diff --git a/hyperon_das/utils.py b/hyperon_das/utils.py index 800d0f6a..4bdd3eba 100644 --- a/hyperon_das/utils.py +++ b/hyperon_das/utils.py @@ -1,9 +1,21 @@ import pickle from dataclasses import dataclass +from http import HTTPStatus # noqa: F401 from importlib import import_module -from typing import Any, Dict, FrozenSet, List, Optional, Set, Union - +from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple, Union + +from requests import sessions +from requests.exceptions import ( # noqa: F401 + ConnectionError, + HTTPError, + JSONDecodeError, + RequestException, + Timeout, +) + +from hyperon_das.decorators import retry from hyperon_das.exceptions import InvalidAssignment +from hyperon_das.logger import logger class Assignment: @@ -110,3 +122,63 @@ def serialize(payload: Any) -> bytes: def deserialize(payload: bytes) -> Any: return pickle.loads(payload) + + +@retry(attempts=5, timeout_seconds=120) +def connect_to_server(host: str, port: int) -> Tuple[int, str]: + """Connect to the server and return the status connection and the url server""" + port = port or '8081' + openfaas_uri = f'http://{host}:{port}/function/query-engine' + aws_lambda_uri = f'http://{host}/prod/query-engine' + + for uri in [openfaas_uri, aws_lambda_uri]: + status_code, message = check_server_connection(uri) + if status_code == HTTPStatus.OK: + url = uri + break + + return status_code, url + + +def check_server_connection(url: str) -> Tuple[int, str]: + logger().debug(f'connecting to remote Das {url}') + + try: + das_version = get_package_version('hyperon_das') + + with sessions.Session() as session: + payload = { + 'action': 'handshake', + 'input': { + 'das_version': das_version, + 'atomdb_version': get_package_version('hyperon_das_atomdb'), + }, + } + response = session.request( + method='POST', + url=url, + data=serialize(payload), + headers={'Content-Type': 'application/octet-stream'}, + timeout=10, + ) + if response.status_code == HTTPStatus.CONFLICT: + try: + response = deserialize(response.content) + remote_das_version = response.get('das').get('version') + except JSONDecodeError as e: + raise Exception(str(e)) + logger().error( + f'Package version conflict error when connecting to remote DAS - Local DAS: `{das_version}` - Remote DAS: `{remote_das_version}`' + ) + raise Exception( + f'The version sent by the local DAS is {das_version}, but the expected version on the server is {remote_das_version}' + ) + if response.status_code == HTTPStatus.OK: + message = "Successful connection" + else: + response.raise_for_status() + message = "Unsuccessful connection" + except (ConnectionError, Timeout, HTTPError, RequestException) as e: + message = str(e) + + return response.status_code, message From c049f2dd283fdc7f631450f230717ddde684f2a3 Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Tue, 26 Mar 2024 11:48:27 -0300 Subject: [PATCH 02/14] refact return --- hyperon_das/das.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/hyperon_das/das.py b/hyperon_das/das.py index 338dac48..a3902bb4 100644 --- a/hyperon_das/das.py +++ b/hyperon_das/das.py @@ -590,7 +590,7 @@ def fetch( # if not is_remote_das and not host and not port: # raise ValueError("The 'host' and 'port' parameters must be sent to DAS local") - return self.query_engine.fetch(host, port, query, **kwargs) + return self.query_engine.fetch(query, host, port, **kwargs) if __name__ == '__main__': @@ -610,7 +610,18 @@ def fetch( } ) - das2 = DistributedAtomSpace(atomdb='ram', query_engine='local') + das2 = das = DistributedAtomSpace( + query_engine='local', + atomdb='redis_mongo', + mongo_hostname='45.63.85.59', + mongo_port=28100, + mongo_username='dbadmin', + mongo_password='dassecret', + redis_hostname='45.63.85.59', + redis_port=29100, + redis_cluster=False, + redis_ssl=False, + ) response2 = das2.fetch( query={ "atom_type": "link", From b09e1e77ae7ecdf364d58c63364df70dae8506be Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Tue, 26 Mar 2024 22:05:13 -0300 Subject: [PATCH 03/14] refator fetch method --- hyperon_das/__init__.py | 2 +- hyperon_das/cache.py | 26 ++++----- hyperon_das/client.py | 2 +- hyperon_das/das.py | 46 +-------------- hyperon_das/query_engines.py | 108 ++++++++++++++++++++++++----------- 5 files changed, 91 insertions(+), 93 deletions(-) diff --git a/hyperon_das/__init__.py b/hyperon_das/__init__.py index 203f9baa..2e407ec6 100644 --- a/hyperon_das/__init__.py +++ b/hyperon_das/__init__.py @@ -2,4 +2,4 @@ __all__ = ['DistributedAtomSpace'] -__version__ = '0.5.6' +__version__ = '0.7.0' diff --git a/hyperon_das/cache.py b/hyperon_das/cache.py index 12171690..3c0893ac 100644 --- a/hyperon_das/cache.py +++ b/hyperon_das/cache.py @@ -3,8 +3,10 @@ from itertools import product from threading import Semaphore, Thread from typing import Any, Dict, List, Optional, TypeVar, Union + from hyperon_das_atomdb import WILDCARD from hyperon_das_atomdb.adapters import InMemoryDB, RedisMongoDB + from hyperon_das.client import FunctionsClient from hyperon_das.utils import Assignment, QueryAnswer @@ -15,28 +17,24 @@ class CacheManager: def __init__(self, cache: AdapterDBType, **kwargs): self.cache = cache - def fetch( + def fetch_data( self, + query: Union[List[dict], dict], host: Optional[str] = None, port: Optional[int] = None, - query: Optional[Union[List[dict], dict]] = None, - **kwargs + **kwargs, ) -> bool: try: if not (server := kwargs.pop('server', None)): server = FunctionsClient(host, port) - - documents = server.fetch(query=query, **kwargs) - - self._populate_cache(documents) - - return True + return server.fetch(query=query, **kwargs) except Exception as e: - return False - - def _populate_cache(self, documents: List[Dict[str, Any]]) -> None: - [self.cache.add_link(document) for document in documents] - self.cache.commit() + # TODO: Map possible errors + raise e + + def bulk_insert(self) -> None: + """Batched INSERT statements in "bulk", not returning rows""" + self.cache.insert(self.documents) class QueryAnswerIterator(ABC): diff --git a/hyperon_das/client.py b/hyperon_das/client.py index eb8749d1..c4a9d813 100644 --- a/hyperon_das/client.py +++ b/hyperon_das/client.py @@ -187,7 +187,7 @@ def fetch( query: Union[List[dict], dict], host: Optional[str] = None, port: Optional[int] = None, - **kwargs + **kwargs, ) -> bool: payload = { 'action': 'fetch', diff --git a/hyperon_das/das.py b/hyperon_das/das.py index a3902bb4..521f9781 100644 --- a/hyperon_das/das.py +++ b/hyperon_das/das.py @@ -583,53 +583,11 @@ def fetch( query: Union[List[dict], dict], host: Optional[str] = None, port: Optional[int] = None, - **kwargs + **kwargs, ) -> bool: - is_remote_das = isinstance(self.query_engine, RemoteQueryEngine) + # is_remote_das = isinstance(self.query_engine, RemoteQueryEngine) # if not is_remote_das and not host and not port: # raise ValueError("The 'host' and 'port' parameters must be sent to DAS local") return self.query_engine.fetch(query, host, port, **kwargs) - - -if __name__ == '__main__': - remote_das_host = "45.63.85.59" - remote_das_port = 8080 - - das1 = DistributedAtomSpace(query_engine='remote', host=remote_das_host, port=remote_das_port) - responses1 = das1.query( - query={ - "atom_type": "link", - "type": "Expression", - "targets": [ - {"atom_type": "node", "type": "Symbol", "name": "Similarity"}, - {"atom_type": "node", "type": "Symbol", "name": '"human"'}, - {"atom_type": "variable", "name": "v1"} - ] - } - ) - - das2 = das = DistributedAtomSpace( - query_engine='local', - atomdb='redis_mongo', - mongo_hostname='45.63.85.59', - mongo_port=28100, - mongo_username='dbadmin', - mongo_password='dassecret', - redis_hostname='45.63.85.59', - redis_port=29100, - redis_cluster=False, - redis_ssl=False, - ) - response2 = das2.fetch( - query={ - "atom_type": "link", - "type": "Expression", - "targets": [ - {"atom_type": "node", "type": "Symbol", "name": "Similarity"}, - {"atom_type": "node", "type": "Symbol", "name": '"human"'}, - {"atom_type": "variable", "name": "v1"} - ] - } - ) \ No newline at end of file diff --git a/hyperon_das/query_engines.py b/hyperon_das/query_engines.py index f4db16f0..8f188e09 100644 --- a/hyperon_das/query_engines.py +++ b/hyperon_das/query_engines.py @@ -1,10 +1,13 @@ +import re from abc import ABC, abstractmethod from typing import Any, Dict, Iterator, List, Optional, Set, Tuple, Union -from hyperon_das_atomdb import WILDCARD + +from hyperon_das_atomdb import WILDCARD, AtomDB from hyperon_das_atomdb.exceptions import AtomDoesNotExist, LinkDoesNotExist, NodeDoesNotExist from hyperon_das.cache import ( AndEvaluator, + CacheManager, CustomQuery, LazyQueryEvaluator, ListIterator, @@ -13,7 +16,6 @@ QueryAnswerIterator, RemoteGetLinks, RemoteIncomingLinks, - CacheManager ) from hyperon_das.client import FunctionsClient from hyperon_das.exceptions import ( @@ -77,21 +79,21 @@ def create_field_index( composite_type: Optional[List[Any]] = None, ) -> str: ... # pragma no cover - - @abstractmethod + + @abstractmethod def fetch( self, query: Union[List[dict], dict], host: Optional[str] = None, port: Optional[int] = None, - **kwargs + **kwargs, ) -> bool: ... # pragma no cover class LocalQueryEngine(QueryEngine): def __init__(self, backend, kwargs: Optional[dict] = None) -> None: - self.cache_manager = kwargs.get('cache_manager') + self.cache_manager: CacheManager = kwargs.get('cache_manager') self.local_backend = backend def _error(self, exception: Exception): @@ -112,9 +114,7 @@ def _recursive_query( elif query["atom_type"] == "node": try: atom_handle = self.local_backend.get_node_handle(query["type"], query["name"]) - return ListIterator( - [QueryAnswer(self.local_backend.get_atom_as_dict(atom_handle), None)] - ) + return ListIterator([QueryAnswer(self.local_backend.get_atom(atom_handle), None)]) except NodeDoesNotExist: return ListIterator([]) elif query["atom_type"] == "link": @@ -149,7 +149,7 @@ def _to_link_dict_list(self, db_answer: Union[List[str], List[Dict]]) -> List[Di answer = [] for atom in db_answer: handle = atom if flat_handle else atom[0] - answer.append(self.local_backend.get_atom_as_dict(handle)) + answer.append(self.local_backend.get_atom(handle)) return answer def _get_related_links( @@ -300,28 +300,68 @@ def fetch( query: Union[List[dict], dict], host: Optional[str] = None, port: Optional[int] = None, - **kwargs + **kwargs, ) -> bool: - if query['atom_type'] == 'node': - handle = self.local_backend.get_node_handle(query["type"], query["name"]) - return self.local_backend.get_atom(handle) - elif query['atom_type'] == 'link': - matched_targets = [] - for target in query["targets"]: - if target["atom_type"] == "node" or target["atom_type"] == "link": - if matched := self.fetch(target, **kwargs): - matched_targets.append(matched) - elif target["atom_type"] == "variable": - matched_targets.append('*') - else: - self._error( - UnexpectedQueryFormat( - message="Query processing reached an unexpected state", - details=f'link: {str(query)} link target: {str(query)}', - ) - ) + if not kwargs.get('running_on_server'): # Local + kwargs['running_on_server'] = True + documents = self.cache_manager.fetch_data(query=query, host=host, port=port, **kwargs) + self.cache_manager.bulk_insert(documents) else: - raise ValueError('Invalid atom type') + + def _generate_target_handles(targets: Dict[str, Any]) -> list: + targets_hash = [] + for target in targets: + if target["atom_type"] == "node": + handle = self.local_backend.node_handle(target["type"], target["name"]) + elif target["atom_type"] == "link": + handle = self._build_handles(target) + elif target["atom_type"] == "variable": + handle = '*' + targets_hash.append(handle) + return targets_hash + + def _target_to_atoms(target: Dict[str, Any]) -> List[dict]: + atom = self.local_backend.get_atom(target, no_convert=True) + if 'name' in atom: # node + return atom + else: + answer = [atom] + for key, value in atom.items(): + if re.search(AtomDB.key_pattern, key): + answer.append(_target_to_atoms(value)) + return answer + + if query['atom_type'] == 'node': + handle = self.local_backend.node_handle(query["type"], query["name"]) + return [self.local_backend.get_atom(handle, no_convert=True)] + elif query['atom_type'] == 'link': + target_handles = _generate_target_handles(query['targets']) + matched_links = self.local_backend.get_matched_links( + link_type=query["type"], target_handles=target_handles + ) + _handles = set() + atoms = [] + for link in matched_links: + link_handle = link[0] + link_targets = link[1] + if link_handle not in _handles: + _handles.add(link_handle) + atoms.append(self.local_backend.get_atom(link_handle, no_convert=True)) + for target in link_targets: + resp = _target_to_atoms(target) + if isinstance(resp, list): + for r in resp: + if r['_id'] not in _handles: + _handles.add(r['_id']) + atoms.append(r) + else: + if resp['_id'] not in _handles: + _handles.add(resp['_id']) + atoms.append(resp) + + return atoms + else: + raise ValueError('Invalid atom type') class RemoteQueryEngine(QueryEngine): @@ -465,10 +505,12 @@ def fetch( query: Union[List[dict], dict], host: Optional[str] = None, port: Optional[int] = None, - **kwargs + **kwargs, ) -> bool: if not host and not port: host = self.query_engine.host port = self.query_engine.port - - return self.cache_manager.fetch(query=query, server=self.remote_das) \ No newline at end of file + kwargs['running_on_server'] = True + return self.cache_manager.fetch_data( + query=query, host=host, port=port, server=self.remote_das + ) From c3a99c84b5ef2a0053f411ae09c2c0636fe7bc1c Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Wed, 27 Mar 2024 08:59:47 -0300 Subject: [PATCH 04/14] fix method in localqueryengine --- hyperon_das/cache.py | 8 +- hyperon_das/client.py | 2 +- hyperon_das/das.py | 2 +- hyperon_das/query_engines.py | 62 ++++++----- tests/integration/test_remote_das.py | 150 +++++++++++++++------------ 5 files changed, 124 insertions(+), 100 deletions(-) diff --git a/hyperon_das/cache.py b/hyperon_das/cache.py index 3c0893ac..33217729 100644 --- a/hyperon_das/cache.py +++ b/hyperon_das/cache.py @@ -23,7 +23,7 @@ def fetch_data( host: Optional[str] = None, port: Optional[int] = None, **kwargs, - ) -> bool: + ) -> List[Dict[str, Any]]: try: if not (server := kwargs.pop('server', None)): server = FunctionsClient(host, port) @@ -32,9 +32,9 @@ def fetch_data( # TODO: Map possible errors raise e - def bulk_insert(self) -> None: - """Batched INSERT statements in "bulk", not returning rows""" - self.cache.insert(self.documents) + def bulk_insert(self, documents: Dict[str, Any]) -> None: + """insert statements in "bulk", not returning rows""" + self.cache.bulk_insert(documents) class QueryAnswerIterator(ABC): diff --git a/hyperon_das/client.py b/hyperon_das/client.py index c4a9d813..fa0439ff 100644 --- a/hyperon_das/client.py +++ b/hyperon_das/client.py @@ -188,7 +188,7 @@ def fetch( host: Optional[str] = None, port: Optional[int] = None, **kwargs, - ) -> bool: + ) -> Any: payload = { 'action': 'fetch', 'input': {'query': query, 'host': host, 'port': port, 'kwargs': kwargs}, diff --git a/hyperon_das/das.py b/hyperon_das/das.py index 521f9781..d5b97e03 100644 --- a/hyperon_das/das.py +++ b/hyperon_das/das.py @@ -584,7 +584,7 @@ def fetch( host: Optional[str] = None, port: Optional[int] = None, **kwargs, - ) -> bool: + ) -> Any: # is_remote_das = isinstance(self.query_engine, RemoteQueryEngine) # if not is_remote_das and not host and not port: diff --git a/hyperon_das/query_engines.py b/hyperon_das/query_engines.py index 8f188e09..8aaaabc6 100644 --- a/hyperon_das/query_engines.py +++ b/hyperon_das/query_engines.py @@ -87,7 +87,7 @@ def fetch( host: Optional[str] = None, port: Optional[int] = None, **kwargs, - ) -> bool: + ) -> Any: ... # pragma no cover @@ -301,7 +301,7 @@ def fetch( host: Optional[str] = None, port: Optional[int] = None, **kwargs, - ) -> bool: + ) -> Any: if not kwargs.get('running_on_server'): # Local kwargs['running_on_server'] = True documents = self.cache_manager.fetch_data(query=query, host=host, port=port, **kwargs) @@ -314,52 +314,61 @@ def _generate_target_handles(targets: Dict[str, Any]) -> list: if target["atom_type"] == "node": handle = self.local_backend.node_handle(target["type"], target["name"]) elif target["atom_type"] == "link": - handle = self._build_handles(target) + handle = self._generate_target_handles(target) elif target["atom_type"] == "variable": - handle = '*' + handle = WILDCARD targets_hash.append(handle) return targets_hash - def _target_to_atoms(target: Dict[str, Any]) -> List[dict]: - atom = self.local_backend.get_atom(target, no_convert=True) + def _handle_to_atoms(handle: str) -> List[dict]: + try: + atom = self.local_backend.get_atom(handle, no_convert=True) + except AtomDoesNotExist: + return [] if 'name' in atom: # node return atom else: answer = [atom] for key, value in atom.items(): if re.search(AtomDB.key_pattern, key): - answer.append(_target_to_atoms(value)) + answer.append(_handle_to_atoms(value)) return answer if query['atom_type'] == 'node': - handle = self.local_backend.node_handle(query["type"], query["name"]) - return [self.local_backend.get_atom(handle, no_convert=True)] + try: + handle = self.local_backend.node_handle(query["type"], query["name"]) + return [self.local_backend.get_atom(handle, no_convert=True)] + except NodeDoesNotExist: + return [] elif query['atom_type'] == 'link': target_handles = _generate_target_handles(query['targets']) matched_links = self.local_backend.get_matched_links( link_type=query["type"], target_handles=target_handles ) _handles = set() - atoms = [] + answer = [] for link in matched_links: - link_handle = link[0] - link_targets = link[1] + if len(matched_links) > 1: + link_handle = link[0] + link_targets = link[1:] + else: + link_handle = link + link_targets = target_handles if link_handle not in _handles: _handles.add(link_handle) - atoms.append(self.local_backend.get_atom(link_handle, no_convert=True)) + answer.append(self.local_backend.get_atom(link_handle, no_convert=True)) for target in link_targets: - resp = _target_to_atoms(target) - if isinstance(resp, list): - for r in resp: - if r['_id'] not in _handles: - _handles.add(r['_id']) - atoms.append(r) + atoms = _handle_to_atoms(target) + if isinstance(atoms, list): + for atom in atoms: + if atom['_id'] not in _handles: + _handles.add(atom['_id']) + answer.append(atom) else: - if resp['_id'] not in _handles: - _handles.add(resp['_id']) - atoms.append(resp) - - return atoms + if atoms['_id'] not in _handles: + _handles.add(atoms['_id']) + answer.append(atoms) + return answer else: raise ValueError('Invalid atom type') @@ -506,11 +515,12 @@ def fetch( host: Optional[str] = None, port: Optional[int] = None, **kwargs, - ) -> bool: + ) -> Any: if not host and not port: host = self.query_engine.host port = self.query_engine.port kwargs['running_on_server'] = True - return self.cache_manager.fetch_data( + documents = self.cache_manager.fetch_data( query=query, host=host, port=port, server=self.remote_das ) + self.cache_manager.bulk_insert(documents) diff --git a/tests/integration/test_remote_das.py b/tests/integration/test_remote_das.py index 2684adff..4858638c 100644 --- a/tests/integration/test_remote_das.py +++ b/tests/integration/test_remote_das.py @@ -10,6 +10,7 @@ from .helpers import metta_animal_base_handles from .remote_das_info import remote_das_host, remote_das_port + def _check_docs(actual, expected): assert len(actual) == len(expected) for dict1, dict2 in zip(actual, expected): @@ -17,6 +18,7 @@ def _check_docs(actual, expected): assert dict1[key] == dict2[key] return True + class TestRemoteDistributedAtomSpace: """Integration tests with OpenFaas function on the Vultr server. Using the Animal Knowledge Base""" @@ -174,77 +176,89 @@ def test_query(self, remote_das: DistributedAtomSpace): for _, link in answer: assert link['handle'] in all_inheritance_mammal if link['handle'] == metta_animal_base_handles.inheritance_chimp_mammal: - assert _check_docs(link['targets'], [ - { - 'handle': metta_animal_base_handles.Inheritance, - 'type': 'Symbol', - 'name': "Inheritance", - }, - { - 'handle': metta_animal_base_handles.chimp, - 'type': 'Symbol', - 'name': '"chimp"', - }, - { - 'handle': metta_animal_base_handles.mammal, - 'type': 'Symbol', - 'name': '"mammal"', - }, - ]) + assert _check_docs( + link['targets'], + [ + { + 'handle': metta_animal_base_handles.Inheritance, + 'type': 'Symbol', + 'name': "Inheritance", + }, + { + 'handle': metta_animal_base_handles.chimp, + 'type': 'Symbol', + 'name': '"chimp"', + }, + { + 'handle': metta_animal_base_handles.mammal, + 'type': 'Symbol', + 'name': '"mammal"', + }, + ], + ) elif link['handle'] == metta_animal_base_handles.inheritance_human_mammal: - assert _check_docs(link['targets'], [ - { - 'handle': metta_animal_base_handles.Inheritance, - 'type': 'Symbol', - 'name': "Inheritance", - }, - { - 'handle': metta_animal_base_handles.human, - 'type': 'Symbol', - 'name': '"human"', - }, - { - 'handle': metta_animal_base_handles.mammal, - 'type': 'Symbol', - 'name': '"mammal"', - }, - ]) + assert _check_docs( + link['targets'], + [ + { + 'handle': metta_animal_base_handles.Inheritance, + 'type': 'Symbol', + 'name': "Inheritance", + }, + { + 'handle': metta_animal_base_handles.human, + 'type': 'Symbol', + 'name': '"human"', + }, + { + 'handle': metta_animal_base_handles.mammal, + 'type': 'Symbol', + 'name': '"mammal"', + }, + ], + ) elif link['handle'] == metta_animal_base_handles.inheritance_monkey_mammal: - assert _check_docs(link['targets'], [ - { - 'handle': metta_animal_base_handles.Inheritance, - 'type': 'Symbol', - 'name': "Inheritance", - }, - { - 'handle': metta_animal_base_handles.monkey, - 'type': 'Symbol', - 'name': '"monkey"', - }, - { - 'handle': metta_animal_base_handles.mammal, - 'type': 'Symbol', - 'name': '"mammal"', - }, - ]) + assert _check_docs( + link['targets'], + [ + { + 'handle': metta_animal_base_handles.Inheritance, + 'type': 'Symbol', + 'name': "Inheritance", + }, + { + 'handle': metta_animal_base_handles.monkey, + 'type': 'Symbol', + 'name': '"monkey"', + }, + { + 'handle': metta_animal_base_handles.mammal, + 'type': 'Symbol', + 'name': '"mammal"', + }, + ], + ) elif link['handle'] == metta_animal_base_handles.inheritance_rhino_mammal: - assert _check_docs(link['targets'], [ - { - 'handle': metta_animal_base_handles.Inheritance, - 'type': 'Symbol', - 'name': "Inheritance", - }, - { - 'handle': metta_animal_base_handles.rhino, - 'type': 'Symbol', - 'name': '"rhino"', - }, - { - 'handle': metta_animal_base_handles.mammal, - 'type': 'Symbol', - 'name': '"mammal"', - }, - ]) + assert _check_docs( + link['targets'], + [ + { + 'handle': metta_animal_base_handles.Inheritance, + 'type': 'Symbol', + 'name': "Inheritance", + }, + { + 'handle': metta_animal_base_handles.rhino, + 'type': 'Symbol', + 'name': '"rhino"', + }, + { + 'handle': metta_animal_base_handles.mammal, + 'type': 'Symbol', + 'name': '"mammal"', + }, + ], + ) def test_get_traversal_cursor(self, remote_das: DistributedAtomSpace): cursor = remote_das.get_traversal_cursor(metta_animal_base_handles.human) From dec20df15ca4475ed43c9f56c3ad78830901081e Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Wed, 27 Mar 2024 10:10:33 -0300 Subject: [PATCH 05/14] Fix unit tests --- hyperon_das/query_engines.py | 140 +++++++++++++++++-------------- tests/integration/test_client.py | 4 +- tests/unit/test_client.py | 50 +++++------ tests/unit/test_das.py | 8 +- tests/unit/test_decorators.py | 6 +- 5 files changed, 104 insertions(+), 104 deletions(-) diff --git a/hyperon_das/query_engines.py b/hyperon_das/query_engines.py index 8aaaabc6..51816144 100644 --- a/hyperon_das/query_engines.py +++ b/hyperon_das/query_engines.py @@ -92,7 +92,7 @@ def fetch( class LocalQueryEngine(QueryEngine): - def __init__(self, backend, kwargs: Optional[dict] = None) -> None: + def __init__(self, backend, kwargs: Optional[dict] = {}) -> None: self.cache_manager: CacheManager = kwargs.get('cache_manager') self.local_backend = backend @@ -173,6 +173,73 @@ def _get_related_links( else: self._error(ValueError("Invalid parameters")) + def _process_node(self, query: dict) -> List[dict]: + try: + handle = self.local_backend.node_handle(query["type"], query["name"]) + return [self.local_backend.get_atom(handle, no_convert=True)] + except AtomDoesNotExist: + return [] + + def _process_link(self, query: dict) -> List[dict]: + target_handles = self._generate_target_handles(query['targets']) + matched_links = self.local_backend.get_matched_links( + link_type=query["type"], target_handles=target_handles + ) + unique_handles = set() + result = [] + + for link in matched_links: + if isinstance(link, str): # single link + link_handle = link + link_targets = target_handles + else: + link_handle, *link_targets = link + + if link_handle not in unique_handles: + unique_handles.add(link_handle) + result.append(self.local_backend.get_atom(link_handle, no_convert=True)) + + for target in link_targets: + atoms = self._handle_to_atoms(target) + if isinstance(atoms, list): + for atom in atoms: + if atom['_id'] not in unique_handles: + unique_handles.add(atom['_id']) + result.append(atom) + else: + if atoms['_id'] not in unique_handles: + unique_handles.add(atoms['_id']) + result.append(atoms) + + return result + + def _generate_target_handles(self, targets: List[Dict[str, Any]]) -> List[str]: + targets_hash = [] + for target in targets: + if target["atom_type"] == "node": + handle = self.local_backend.node_handle(target["type"], target["name"]) + elif target["atom_type"] == "link": + handle = self._generate_target_handles(target) + elif target["atom_type"] == "variable": + handle = WILDCARD + targets_hash.append(handle) + return targets_hash + + def _handle_to_atoms(self, handle: str) -> Union[List[dict], dict]: + try: + atom = self.local_backend.get_atom(handle, no_convert=True) + except AtomDoesNotExist: + return [] + + if 'name' in atom: # node + return atom + else: # link + answer = [atom] + for key, value in atom.items(): + if re.search(AtomDB.key_pattern, key): + answer.append(self._handle_to_atoms(value)) + return answer + def get_atom(self, handle: str, **kwargs) -> Union[Dict[str, Any], None]: try: return self.local_backend.get_atom(handle, **kwargs) @@ -307,74 +374,21 @@ def fetch( documents = self.cache_manager.fetch_data(query=query, host=host, port=port, **kwargs) self.cache_manager.bulk_insert(documents) else: + if 'atom_type' not in query: + raise ValueError('Invalid query: missing atom_type') - def _generate_target_handles(targets: Dict[str, Any]) -> list: - targets_hash = [] - for target in targets: - if target["atom_type"] == "node": - handle = self.local_backend.node_handle(target["type"], target["name"]) - elif target["atom_type"] == "link": - handle = self._generate_target_handles(target) - elif target["atom_type"] == "variable": - handle = WILDCARD - targets_hash.append(handle) - return targets_hash - - def _handle_to_atoms(handle: str) -> List[dict]: - try: - atom = self.local_backend.get_atom(handle, no_convert=True) - except AtomDoesNotExist: - return [] - if 'name' in atom: # node - return atom - else: - answer = [atom] - for key, value in atom.items(): - if re.search(AtomDB.key_pattern, key): - answer.append(_handle_to_atoms(value)) - return answer - - if query['atom_type'] == 'node': - try: - handle = self.local_backend.node_handle(query["type"], query["name"]) - return [self.local_backend.get_atom(handle, no_convert=True)] - except NodeDoesNotExist: - return [] - elif query['atom_type'] == 'link': - target_handles = _generate_target_handles(query['targets']) - matched_links = self.local_backend.get_matched_links( - link_type=query["type"], target_handles=target_handles - ) - _handles = set() - answer = [] - for link in matched_links: - if len(matched_links) > 1: - link_handle = link[0] - link_targets = link[1:] - else: - link_handle = link - link_targets = target_handles - if link_handle not in _handles: - _handles.add(link_handle) - answer.append(self.local_backend.get_atom(link_handle, no_convert=True)) - for target in link_targets: - atoms = _handle_to_atoms(target) - if isinstance(atoms, list): - for atom in atoms: - if atom['_id'] not in _handles: - _handles.add(atom['_id']) - answer.append(atom) - else: - if atoms['_id'] not in _handles: - _handles.add(atoms['_id']) - answer.append(atoms) - return answer + atom_type = query['atom_type'] + + if atom_type == 'node': + return self._process_node(query) + elif atom_type == 'link': + return self._process_link(query) else: raise ValueError('Invalid atom type') class RemoteQueryEngine(QueryEngine): - def __init__(self, backend, kwargs): + def __init__(self, backend, kwargs: Optional[dict] = {}): self.cache_manager: CacheManager = kwargs.get('cache_manager') self.local_query_engine = LocalQueryEngine(backend, kwargs) self.host = kwargs.get('host') diff --git a/tests/integration/test_client.py b/tests/integration/test_client.py index 8dc1ba21..0d478190 100644 --- a/tests/integration/test_client.py +++ b/tests/integration/test_client.py @@ -9,9 +9,7 @@ class TestVultrClientIntegration: @pytest.fixture() def server(self): - return FunctionsClient( - url=f'http://{remote_das_host}:{remote_das_port}/function/query-engine' - ) + return FunctionsClient(host=remote_das_host, port=remote_das_port) def test_get_atom(self, server: FunctionsClient): result = server.get_atom(handle=metta_animal_base_handles.human) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 71a474df..c41cfdf5 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -15,7 +15,12 @@ def mock_request(self): with patch('requests.sessions.Session.request') as mock_request: yield mock_request - def test_get_atom_success(self, mock_request): + @pytest.fixture + def client(self): + with patch('hyperon_das.utils.check_server_connection', return_value=(200, 'OK')): + return FunctionsClient(host='0.0.0.0', port=1000) + + def test_get_atom_success(self, mock_request, client): expected_request_data = {"action": "get_atom", "input": {"handle": "123"}} expected_response = { "handle": "af12f10f9ae2002a1607ba0b47ba8407", @@ -27,19 +32,18 @@ def test_get_atom_success(self, mock_request): mock_request.return_value.status_code = 200 mock_request.return_value.content = serialize(expected_response) - client = FunctionsClient(url='http://example.com') result = client.get_atom(handle='123') mock_request.assert_called_with( method='POST', - url='http://example.com', + url='http://0.0.0.0:1000/function/query-engine', data=serialize(expected_request_data), headers={'Content-Type': 'application/octet-stream'}, ) assert result == expected_response - def test_get_node_success(self, mock_request): + def test_get_node_success(self, mock_request, client): expected_request_data = { "action": "get_node", "input": {"node_type": "Concept", "node_name": "human"}, @@ -54,19 +58,18 @@ def test_get_node_success(self, mock_request): mock_request.return_value.status_code = 200 mock_request.return_value.content = serialize(expected_response) - client = FunctionsClient(url='http://example.com') result = client.get_node(node_type='Concept', node_name='human') mock_request.assert_called_with( method='POST', - url='http://example.com', + url='http://0.0.0.0:1000/function/query-engine', data=serialize(expected_request_data), headers={'Content-Type': 'application/octet-stream'}, ) assert result == expected_response - def test_get_link_success(self, mock_request): + def test_get_link_success(self, mock_request, client): expected_request_data = { "action": "get_link", "input": { @@ -94,7 +97,6 @@ def test_get_link_success(self, mock_request): mock_request.return_value.status_code = 200 mock_request.return_value.content = serialize(expected_response) - client = FunctionsClient(url='http://example.com') result = client.get_link( link_type='Similarity', link_targets=['af12f10f9ae2002a1607ba0b47ba8407', '1cdffc6b0b89ff41d68bec237481d1e1'], @@ -102,14 +104,14 @@ def test_get_link_success(self, mock_request): mock_request.assert_called_with( method='POST', - url='http://example.com', + url='http://0.0.0.0:1000/function/query-engine', data=serialize(expected_request_data), headers={'Content-Type': 'application/octet-stream'}, ) assert result == expected_response - def test_get_links_success(self, mock_request): + def test_get_links_success(self, mock_request, client): expected_request_data = { "action": "get_links", "input": { @@ -133,7 +135,6 @@ def test_get_links_success(self, mock_request): mock_request.return_value.status_code = 200 mock_request.return_value.content = serialize(expected_response) - client = FunctionsClient(url='http://example.com') result = client.get_links( link_type='Inheritance', link_targets=['4e8e26e3276af8a5c2ac2cc2dc95c6d2', '80aff30094874e75028033a38ce677bb'], @@ -141,14 +142,14 @@ def test_get_links_success(self, mock_request): mock_request.assert_called_with( method='POST', - url='http://example.com', + url='http://0.0.0.0:1000/function/query-engine', data=serialize(expected_request_data), headers={'Content-Type': 'application/octet-stream'}, ) assert result == expected_response - def test_query_success(self, mock_request): + def test_query_success(self, mock_request, client): expected_request_data = { "action": "query", "input": { @@ -186,7 +187,6 @@ def test_query_success(self, mock_request): mock_request.return_value.status_code = 200 mock_request.return_value.content = serialize(expected_response) - client = FunctionsClient(url='http://example.com') query = { "atom_type": "link", "type": "Similarity", @@ -200,33 +200,31 @@ def test_query_success(self, mock_request): mock_request.assert_called_with( method='POST', - url='http://example.com', + url='http://0.0.0.0:1000/function/query-engine', data=serialize(expected_request_data), headers={'Content-Type': 'application/octet-stream'}, ) assert result == expected_response - def test_count_atoms_success(self, mock_request): + def test_count_atoms_success(self, mock_request, client): expected_request_data = {"action": "count_atoms", "input": {}} expected_response = (14, 26) mock_request.return_value.status_code = 200 mock_request.return_value.content = serialize(expected_response) - - client = FunctionsClient(url='http://example.com') result = client.count_atoms() mock_request.assert_called_once_with( method='POST', - url='http://example.com', + url='http://0.0.0.0:1000/function/query-engine', data=serialize(expected_request_data), headers={'Content-Type': 'application/octet-stream'}, ) assert result == expected_response - def test_send_request_success(self, mock_request): + def test_send_request_success(self, mock_request, client): payload = {"action": "get_atom", "input": {"handle": "123"}} expected_response = { "handle": "af12f10f9ae2002a1607ba0b47ba8407", @@ -238,40 +236,36 @@ def test_send_request_success(self, mock_request): mock_request.return_value.status_code = 200 mock_request.return_value.content = serialize(expected_response) - client = FunctionsClient(url='http://example.com') result = client._send_request(payload) mock_request.assert_called_with( method='POST', - url='http://example.com', + url='http://0.0.0.0:1000/function/query-engine', data=serialize(payload), headers={'Content-Type': 'application/octet-stream'}, ) assert result == expected_response - def test_send_request_connection_error(self, mock_request): + def test_send_request_connection_error(self, mock_request, client): mock_request.side_effect = exceptions.ConnectionError() - client = FunctionsClient(url='http://example.com') payload = {"action": "get_atom", "input": {"handle": "123"}} with pytest.raises(ConnectionError): client._send_request(payload) - def test_send_request_timeout_error(self, mock_request): + def test_send_request_timeout_error(self, mock_request, client): mock_request.side_effect = exceptions.Timeout() - client = FunctionsClient(url='http://example.com') payload = {"action": "get_atom", "input": {"handle": "123"}} with pytest.raises(TimeoutError): client._send_request(payload) - def test_send_request_request_exception(self, mock_request): + def test_send_request_request_exception(self, mock_request, client): mock_request.side_effect = exceptions.RequestException() - client = FunctionsClient(url='http://example.com') payload = {"action": "get_atom", "input": {"handle": "123"}} with pytest.raises(RequestError): diff --git a/tests/unit/test_das.py b/tests/unit/test_das.py index 96e944fc..3d8d3e84 100644 --- a/tests/unit/test_das.py +++ b/tests/unit/test_das.py @@ -17,9 +17,7 @@ def test_create_das(self): assert isinstance(das.backend, InMemoryDB) assert isinstance(das.query_engine, LocalQueryEngine) - with mock.patch( - 'hyperon_das.das.RemoteQueryEngine._connect_server', return_value='url-test' - ): + with mock.patch('hyperon_das.utils.check_server_connection', return_value=(200, 'OK')): das = DistributedAtomSpace(query_engine='remote', host='0.0.0.0', port=1234) assert isinstance(das.backend, InMemoryDB) assert isinstance(das.query_engine, RemoteQueryEngine) @@ -41,9 +39,7 @@ def test_get_incoming_links(self): links = das.get_incoming_links('') assert len(links) == 7 - with mock.patch( - 'hyperon_das.query_engines.RemoteQueryEngine._connect_server', return_value='fake' - ): + with mock.patch('hyperon_das.utils.check_server_connection', return_value=(200, 'OK')): das_remote = DistributedAtomSpaceMock('remote', host='test') with mock.patch( diff --git a/tests/unit/test_decorators.py b/tests/unit/test_decorators.py index 5ece6637..ac73d005 100644 --- a/tests/unit/test_decorators.py +++ b/tests/unit/test_decorators.py @@ -10,15 +10,13 @@ @patch('hyperon_das.logger') def test_retry_successful_connection(logger_mock): - expected_output = "Success" - @retry(attempts=3, timeout_seconds=5) def successful_function(self, host, port): - return expected_output + return 200, "Success" result = successful_function({}, 'localhost', 80) - assert result == expected_output + assert result == 'Success' @patch('hyperon_das.logger') From 6a565cdf62c3c81ab4b2c67c1eb82d15655b9f56 Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Wed, 27 Mar 2024 10:58:16 -0300 Subject: [PATCH 06/14] Fix integration test --- hyperon_das/decorators.py | 4 ++-- hyperon_das/utils.py | 13 +++++------ tests/integration/test_remote_das.py | 32 ++++++++++++---------------- 3 files changed, 21 insertions(+), 28 deletions(-) diff --git a/hyperon_das/decorators.py b/hyperon_das/decorators.py index 549d38e2..faa008b8 100644 --- a/hyperon_das/decorators.py +++ b/hyperon_das/decorators.py @@ -35,9 +35,9 @@ def wrapper(*args, **kwargs): time.sleep(waiting_time_seconds) retry_count += 1 timer_count += end_time - start_time - port = f':{args[2]}' if args[2] else '' + port = f':{args[1]}' if len(args) > 1 else '' message = ( - f'Failed to connect to remote Das {args[1]}' + f'Failed to connect to remote Das {args[0]}' + port + f' - attempts:{retry_count} - time_attempted: {timer_count}' ) diff --git a/hyperon_das/utils.py b/hyperon_das/utils.py index 4bdd3eba..6b73f726 100644 --- a/hyperon_das/utils.py +++ b/hyperon_das/utils.py @@ -134,10 +134,9 @@ def connect_to_server(host: str, port: int) -> Tuple[int, str]: for uri in [openfaas_uri, aws_lambda_uri]: status_code, message = check_server_connection(uri) if status_code == HTTPStatus.OK: - url = uri break - return status_code, url + return status_code, uri def check_server_connection(url: str) -> Tuple[int, str]: @@ -174,11 +173,9 @@ def check_server_connection(url: str) -> Tuple[int, str]: f'The version sent by the local DAS is {das_version}, but the expected version on the server is {remote_das_version}' ) if response.status_code == HTTPStatus.OK: - message = "Successful connection" + return response.status_code, "Successful connection" else: + print(f'Response: {deserialize(response.content)}') response.raise_for_status() - message = "Unsuccessful connection" - except (ConnectionError, Timeout, HTTPError, RequestException) as e: - message = str(e) - - return response.status_code, message + except (ConnectionError, Timeout, HTTPError, RequestException, Exception) as e: + return 400, str(e) diff --git a/tests/integration/test_remote_das.py b/tests/integration/test_remote_das.py index 4858638c..4f7e290f 100644 --- a/tests/integration/test_remote_das.py +++ b/tests/integration/test_remote_das.py @@ -24,10 +24,7 @@ class TestRemoteDistributedAtomSpace: @pytest.fixture def remote_das(self): - with mock.patch( - 'hyperon_das.query_engines.RemoteQueryEngine._connect_server', - return_value=f'http://{remote_das_host}:{remote_das_port}/function/query-engine', - ): + with mock.patch('hyperon_das.utils.check_server_connection', return_value=(200, 'OK')): return DistributedAtomSpace( query_engine='remote', host=remote_das_host, port=remote_das_port ) # vultr @@ -35,20 +32,19 @@ def remote_das(self): def traversal(self, das: DistributedAtomSpace, handle: str): return das.get_traversal_cursor(handle) - # TODO: uncomment the test after the handshake method in servless-function is working - # def test_server_connection(self): - # try: - # das = DistributedAtomSpace( - # query_engine='remote', host=remote_das_host, port=remote_das_port - # ) - # except Exception as e: - # pytest.fail(f'Connection with OpenFaaS server fail, Details: {str(e)}') - # if not das.query_engine.remote_das.url: - # pytest.fail('Connection with server fail') - # assert ( - # das.query_engine.remote_das.url - # == f'http://{remote_das_host}:{remote_das_port}/function/query-engine' - # ) + def test_server_connection(self): + try: + das = DistributedAtomSpace( + query_engine='remote', host=remote_das_host, port=remote_das_port + ) + except Exception as e: + pytest.fail(f'Connection with OpenFaaS server fail, Details: {str(e)}') + if not das.query_engine.remote_das.url: + pytest.fail('Connection with server fail') + assert ( + das.query_engine.remote_das.url + == f'http://{remote_das_host}:{remote_das_port}/function/query-engine' + ) def test_get_atom(self, remote_das: DistributedAtomSpace): result = remote_das.get_atom(handle=metta_animal_base_handles.human) From 86919f537942289c9234c46c3c4b471c0242cc0e Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Wed, 27 Mar 2024 12:14:25 -0300 Subject: [PATCH 07/14] create integration test --- hyperon_das/das.py | 6 +-- ...local_redis_mongo.py => test_local_das.py} | 52 +++++++++++++++++++ tests/integration/test_remote_das.py | 16 ++++++ 3 files changed, 70 insertions(+), 4 deletions(-) rename tests/integration/{test_local_redis_mongo.py => test_local_das.py} (54%) diff --git a/hyperon_das/das.py b/hyperon_das/das.py index d5b97e03..98dd6fc8 100644 --- a/hyperon_das/das.py +++ b/hyperon_das/das.py @@ -585,9 +585,7 @@ def fetch( port: Optional[int] = None, **kwargs, ) -> Any: - # is_remote_das = isinstance(self.query_engine, RemoteQueryEngine) - - # if not is_remote_das and not host and not port: - # raise ValueError("The 'host' and 'port' parameters must be sent to DAS local") + if not kwargs.get('running_on_server') and not host and not port: + raise ValueError("The 'host' and 'port' parameters must be sent to DAS local") return self.query_engine.fetch(query, host, port, **kwargs) diff --git a/tests/integration/test_local_redis_mongo.py b/tests/integration/test_local_das.py similarity index 54% rename from tests/integration/test_local_redis_mongo.py rename to tests/integration/test_local_das.py index 1b93ccd3..4419704b 100644 --- a/tests/integration/test_local_redis_mongo.py +++ b/tests/integration/test_local_das.py @@ -1,6 +1,7 @@ import pytest from hyperon_das import DistributedAtomSpace +from tests.integration.remote_das_info import remote_das_host, remote_das_port from tests.utils import load_animals_base from .helpers import _db_down, _db_up, cleanup, mongo_port, redis_port @@ -75,3 +76,54 @@ def test_add_atom_persistence(self): assert das2.count_atoms() == (15, 27) _db_down() + + @pytest.mark.skip(reason="Disable. See: das-serverless-functions#100") + def test_fetch_atoms(self): + _db_up() + das = DistributedAtomSpace( + query_engine='local', + atomdb='redis_mongo', + mongo_port=mongo_port, + mongo_username='dbadmin', + mongo_password='dassecret', + redis_port=redis_port, + redis_cluster=False, + redis_ssl=False, + ) + assert das.count_atoms() == (0, 0) + das.fetch( + query={ + "atom_type": "link", + "type": "Expression", + "targets": [ + {"atom_type": "node", "type": "Symbol", "name": "Inheritance"}, + {"atom_type": "variable", "name": "v1"}, + {"atom_type": "node", "type": "Symbol", "name": '"mammal"'}, + ], + }, + host=remote_das_host, + port=remote_das_port, + ) + assert das.count_atoms() == (5, 4) + _db_down() + + +class TestLocalDASRamOnly: + @pytest.mark.skip(reason="Disable. See: das-serverless-functions#100") + def test_fetch_atoms_local_das_ram_only(self): + das = DistributedAtomSpace() + assert das.count_atoms() == (0, 0) + das.fetch( + query={ + "atom_type": "link", + "type": "Expression", + "targets": [ + {"atom_type": "node", "type": "Symbol", "name": "Inheritance"}, + {"atom_type": "variable", "name": "v1"}, + {"atom_type": "node", "type": "Symbol", "name": '"mammal"'}, + ], + }, + host=remote_das_host, + port=remote_das_port, + ) + assert das.count_atoms() == (5, 4) diff --git a/tests/integration/test_remote_das.py b/tests/integration/test_remote_das.py index 4f7e290f..10a5f1bc 100644 --- a/tests/integration/test_remote_das.py +++ b/tests/integration/test_remote_das.py @@ -298,3 +298,19 @@ def is_literal(atom: dict): cursor.goto(metta_animal_base_handles.human) assert cursor.get()['handle'] == metta_animal_base_handles.human + + @pytest.mark.skip(reason="Disable. See: das-serverless-functions#100") + def test_fetch_atoms(self, remote_das): + assert remote_das.backend.count_atoms() == (0, 0) + remote_das.fetch( + query={ + "atom_type": "link", + "type": "Expression", + "targets": [ + {"atom_type": "node", "type": "Symbol", "name": "Inheritance"}, + {"atom_type": "variable", "name": "v1"}, + {"atom_type": "node", "type": "Symbol", "name": '"mammal"'}, + ], + } + ) + assert remote_das.backend.count_atoms() == (5, 4) From ec77412c111c5e8300e7dc9acc1c761c8aaeb007 Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Wed, 27 Mar 2024 14:13:57 -0300 Subject: [PATCH 08/14] Add docstring and comment --- CHANGELOG | 2 +- hyperon_das/das.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/CHANGELOG b/CHANGELOG index 8b137891..ce736767 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1 +1 @@ - +[#201] Implement fetch() in the DAS API \ No newline at end of file diff --git a/hyperon_das/das.py b/hyperon_das/das.py index 98dd6fc8..75dfd751 100644 --- a/hyperon_das/das.py +++ b/hyperon_das/das.py @@ -585,6 +585,35 @@ def fetch( port: Optional[int] = None, **kwargs, ) -> Any: + """Fetch data from the remote server using the a query as input and load it locally. + If it is a local DAS, the host and port must be sent. + + The input dict is a link, used as a pattern to make the query. + Variables can be used as link targets as well as nodes. Nested links are + allowed as well. + + Args: + query (Union[List[dict], dict]): A pattern described as a link (possibly with nested links) + with nodes and variables used to query the knowledge base. + host (Optional[str], optional): Address to remote server. Defaults to None. + port (Optional[int], optional): Port to remote server. Defaults to None. + + Raises: + ValueError: If the 'host' and 'port' parameters are not sent to DAS local + + Examples: + >>> query = { + "atom_type": "link", + "type": "Expression", + "targets": [ + {"atom_type": "node", "type": "Symbol", "name": "Inheritance"}, + {"atom_type": "variable", "name": "v1"}, + {"atom_type": "node", "type": "Symbol", "name": '"mammal"'}, + ], + } + das = DistributedAtomSpace() + das.fetch(query, host='123.4.5.6', port=8080) + """ if not kwargs.get('running_on_server') and not host and not port: raise ValueError("The 'host' and 'port' parameters must be sent to DAS local") From fbf303c321a0f7bf1d19d5aaeb3c05233293258e Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Wed, 27 Mar 2024 14:23:04 -0300 Subject: [PATCH 09/14] add kwargs in fetch method --- hyperon_das/query_engines.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/hyperon_das/query_engines.py b/hyperon_das/query_engines.py index 51816144..bcdae8db 100644 --- a/hyperon_das/query_engines.py +++ b/hyperon_das/query_engines.py @@ -533,8 +533,11 @@ def fetch( if not host and not port: host = self.query_engine.host port = self.query_engine.port - kwargs['running_on_server'] = True - documents = self.cache_manager.fetch_data( - query=query, host=host, port=port, server=self.remote_das + kwargs.update( + { + 'running_on_server': True, + 'server': self.remote_das, + } ) + documents = self.cache_manager.fetch_data(query=query, host=host, port=port, **kwargs) self.cache_manager.bulk_insert(documents) From cd7709d1e9620527c1b3109f4df0a72d9134f49e Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Wed, 27 Mar 2024 15:40:13 -0300 Subject: [PATCH 10/14] add bulk_insert in Mock class --- tests/unit/mock.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/unit/mock.py b/tests/unit/mock.py index 8f7b5bd9..7568364a 100644 --- a/tests/unit/mock.py +++ b/tests/unit/mock.py @@ -327,6 +327,9 @@ def create_field_index( ) -> str: pass + def bulk_insert(self, documents: List[Dict[str, Any]]) -> None: + pass + class DatabaseAnimals(DatabaseMock): def __init__(self): From 18bde9d8e33ecc8df585a88b44b0808a5ee3a33b Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Wed, 27 Mar 2024 18:10:42 -0300 Subject: [PATCH 11/14] add system_parameters --- hyperon_das/das.py | 35 +++++++++++++++++++++-------------- hyperon_das/query_engines.py | 18 ++++++++---------- tests/unit/mock.py | 4 ++-- 3 files changed, 31 insertions(+), 26 deletions(-) diff --git a/hyperon_das/das.py b/hyperon_das/das.py index 75dfd751..be6b7b89 100644 --- a/hyperon_das/das.py +++ b/hyperon_das/das.py @@ -17,15 +17,18 @@ class DistributedAtomSpace: - def __init__(self, **kwargs: Optional[Dict[str, Any]]) -> None: - atomdb_parameter = kwargs.get('atomdb', 'ram') - query_engine_parameter = kwargs.get('query_engine', 'local') - - if atomdb_parameter == "ram": + def __init__(self, system_parameters: Dict[str, Any] = {}, **kwargs) -> None: + if not system_parameters.get('running_on_server'): + system_parameters['running_on_server'] = False + self.system_parameters = system_parameters + atomdb = kwargs.get('atomdb', 'ram') + query_engine = kwargs.get('query_engine', 'local') + + if atomdb == "ram": self.backend = InMemoryDB() - elif atomdb_parameter == "redis_mongo": + elif atomdb == "redis_mongo": self.backend = RedisMongoDB(**kwargs) - if query_engine_parameter != "local": + if query_engine != "local": raise InvalidDASParameters( message="'redis_mongo' backend requires local query engine ('query_engine=local')" ) @@ -34,16 +37,18 @@ def __init__(self, **kwargs: Optional[Dict[str, Any]]) -> None: kwargs.update({'cache_manager': CacheManager(self.backend)}) - if query_engine_parameter == 'local': - self.query_engine = LocalQueryEngine(self.backend, kwargs) + if query_engine == 'local': + self._das_type = 'local_ram_only' if atomdb == 'ram' else 'local_redis_mongo' + self.query_engine = LocalQueryEngine(self.backend, self.system_parameters, kwargs) logger().info('Initialized local Das') - elif query_engine_parameter == "remote": - self.query_engine = RemoteQueryEngine(self.backend, kwargs) + elif query_engine == "remote": + self._das_type = 'remote' + self.query_engine = RemoteQueryEngine(self.backend, self.system_parameters, kwargs) logger().info('Initialized remote Das') else: raise InvalidQueryEngine( message='The possible values are: `local` or `remote`', - details=f'query_engine={query_engine_parameter}', + details=f'query_engine={query_engine}', ) @staticmethod @@ -614,7 +619,9 @@ def fetch( das = DistributedAtomSpace() das.fetch(query, host='123.4.5.6', port=8080) """ - if not kwargs.get('running_on_server') and not host and not port: - raise ValueError("The 'host' and 'port' parameters must be sent to DAS local") + + if not self.system_parameters.get('running_on_server'): + if self._das_type != 'remote' and not host or not port: + raise ValueError("The 'host' and 'port' parameters must be sent to DAS local") return self.query_engine.fetch(query, host, port, **kwargs) diff --git a/hyperon_das/query_engines.py b/hyperon_das/query_engines.py index bcdae8db..9c24b510 100644 --- a/hyperon_das/query_engines.py +++ b/hyperon_das/query_engines.py @@ -92,7 +92,10 @@ def fetch( class LocalQueryEngine(QueryEngine): - def __init__(self, backend, kwargs: Optional[dict] = {}) -> None: + def __init__( + self, backend, system_parameters: Dict[str, Any], kwargs: Optional[dict] = {} + ) -> None: + self.system_parameters = system_parameters self.cache_manager: CacheManager = kwargs.get('cache_manager') self.local_backend = backend @@ -369,8 +372,7 @@ def fetch( port: Optional[int] = None, **kwargs, ) -> Any: - if not kwargs.get('running_on_server'): # Local - kwargs['running_on_server'] = True + if not self.system_parameters.get('running_on_server'): # Local documents = self.cache_manager.fetch_data(query=query, host=host, port=port, **kwargs) self.cache_manager.bulk_insert(documents) else: @@ -388,7 +390,8 @@ def fetch( class RemoteQueryEngine(QueryEngine): - def __init__(self, backend, kwargs: Optional[dict] = {}): + def __init__(self, backend, system_parameters: Dict[str, Any], kwargs: Optional[dict] = {}): + self.system_parameters = system_parameters self.cache_manager: CacheManager = kwargs.get('cache_manager') self.local_query_engine = LocalQueryEngine(backend, kwargs) self.host = kwargs.get('host') @@ -533,11 +536,6 @@ def fetch( if not host and not port: host = self.query_engine.host port = self.query_engine.port - kwargs.update( - { - 'running_on_server': True, - 'server': self.remote_das, - } - ) + kwargs.update({'server': self.remote_das}) documents = self.cache_manager.fetch_data(query=query, host=host, port=port, **kwargs) self.cache_manager.bulk_insert(documents) diff --git a/tests/unit/mock.py b/tests/unit/mock.py index 7568364a..e1f23512 100644 --- a/tests/unit/mock.py +++ b/tests/unit/mock.py @@ -26,9 +26,9 @@ class DistributedAtomSpaceMock(DistributedAtomSpace): def __init__(self, query_engine: Optional[str] = 'local', **kwargs) -> None: self.backend = DatabaseAnimals() if query_engine == 'remote': - self.query_engine = RemoteQueryEngine(self.backend, kwargs) + self.query_engine = RemoteQueryEngine(self.backend, {}, kwargs) else: - self.query_engine = LocalQueryEngine(self.backend) + self.query_engine = LocalQueryEngine(self.backend, {}, kwargs) class DatabaseMock(AtomDB): From 81ea33c8903086499568417b4aff0ff608701a44 Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Wed, 27 Mar 2024 18:32:27 -0300 Subject: [PATCH 12/14] change assert --- tests/integration/test_local_das.py | 4 ++-- tests/integration/test_remote_das.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_local_das.py b/tests/integration/test_local_das.py index 4419704b..75b0cc18 100644 --- a/tests/integration/test_local_das.py +++ b/tests/integration/test_local_das.py @@ -104,7 +104,7 @@ def test_fetch_atoms(self): host=remote_das_host, port=remote_das_port, ) - assert das.count_atoms() == (5, 4) + assert das.count_atoms() == (6, 4) _db_down() @@ -126,4 +126,4 @@ def test_fetch_atoms_local_das_ram_only(self): host=remote_das_host, port=remote_das_port, ) - assert das.count_atoms() == (5, 4) + assert das.count_atoms() == (6, 4) diff --git a/tests/integration/test_remote_das.py b/tests/integration/test_remote_das.py index 10a5f1bc..4a603887 100644 --- a/tests/integration/test_remote_das.py +++ b/tests/integration/test_remote_das.py @@ -313,4 +313,4 @@ def test_fetch_atoms(self, remote_das): ], } ) - assert remote_das.backend.count_atoms() == (5, 4) + assert remote_das.backend.count_atoms() == (6, 4) From 4f4c2d98d59ddd8fbc3e986624fdbb4f648cf2b6 Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Thu, 28 Mar 2024 07:11:58 -0300 Subject: [PATCH 13/14] change the parameter name --- hyperon_das/query_engines.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hyperon_das/query_engines.py b/hyperon_das/query_engines.py index 9c24b510..d8b27963 100644 --- a/hyperon_das/query_engines.py +++ b/hyperon_das/query_engines.py @@ -179,7 +179,7 @@ def _get_related_links( def _process_node(self, query: dict) -> List[dict]: try: handle = self.local_backend.node_handle(query["type"], query["name"]) - return [self.local_backend.get_atom(handle, no_convert=True)] + return [self.local_backend.get_atom(handle, no_target_format=True)] except AtomDoesNotExist: return [] @@ -200,7 +200,7 @@ def _process_link(self, query: dict) -> List[dict]: if link_handle not in unique_handles: unique_handles.add(link_handle) - result.append(self.local_backend.get_atom(link_handle, no_convert=True)) + result.append(self.local_backend.get_atom(link_handle, no_target_format=True)) for target in link_targets: atoms = self._handle_to_atoms(target) @@ -230,7 +230,7 @@ def _generate_target_handles(self, targets: List[Dict[str, Any]]) -> List[str]: def _handle_to_atoms(self, handle: str) -> Union[List[dict], dict]: try: - atom = self.local_backend.get_atom(handle, no_convert=True) + atom = self.local_backend.get_atom(handle, no_target_format=True) except AtomDoesNotExist: return [] From f6e850fac4a5cca826f9353eb8e2bfbea1797f16 Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Thu, 28 Mar 2024 09:55:52 -0300 Subject: [PATCH 14/14] add private method --- hyperon_das/das.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/hyperon_das/das.py b/hyperon_das/das.py index be6b7b89..4f3aae80 100644 --- a/hyperon_das/das.py +++ b/hyperon_das/das.py @@ -18,9 +18,8 @@ class DistributedAtomSpace: def __init__(self, system_parameters: Dict[str, Any] = {}, **kwargs) -> None: - if not system_parameters.get('running_on_server'): - system_parameters['running_on_server'] = False self.system_parameters = system_parameters + self._set_default_system_parameters() atomdb = kwargs.get('atomdb', 'ram') query_engine = kwargs.get('query_engine', 'local') @@ -51,6 +50,10 @@ def __init__(self, system_parameters: Dict[str, Any] = {}, **kwargs) -> None: details=f'query_engine={query_engine}', ) + def _set_default_system_parameters(self) -> None: + if not self.system_parameters.get('running_on_server'): + self.system_parameters['running_on_server'] = False + @staticmethod def about() -> dict: return {