Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#201] Implement fetch() in the DAS API #207

Merged
merged 15 commits into from
Mar 28, 2024
2 changes: 1 addition & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
@@ -1 +1 @@

[#201] Implement fetch() in the DAS API
30 changes: 29 additions & 1 deletion hyperon_das/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,40 @@
from collections import deque
from itertools import product
from threading import Semaphore, Thread
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, TypeVar, Union

from hyperon_das_atomdb import WILDCARD
from hyperon_das_atomdb.adapters import InMemoryDB, RedisMongoDB

from hyperon_das.client import FunctionsClient
from hyperon_das.utils import Assignment, QueryAnswer

AdapterDBType = TypeVar("AdapterDBType", RedisMongoDB, InMemoryDB)


class CacheManager:
def __init__(self, cache: AdapterDBType, **kwargs):
self.cache = cache

def fetch_data(
self,
query: Union[List[dict], dict],
host: Optional[str] = None,
port: Optional[int] = None,
**kwargs,
) -> List[Dict[str, Any]]:
try:
if not (server := kwargs.pop('server', None)):
server = FunctionsClient(host, port)
return server.fetch(query=query, **kwargs)
except Exception as e:
# TODO: Map possible errors
raise e

def bulk_insert(self, documents: Dict[str, Any]) -> None:
"""insert statements in "bulk", not returning rows"""
self.cache.bulk_insert(documents)


class QueryAnswerIterator(ABC):
def __init__(self, source: Any):
Expand Down
23 changes: 20 additions & 3 deletions hyperon_das/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@

from hyperon_das.exceptions import ConnectionError, HTTPError, RequestError, TimeoutError
from hyperon_das.logger import logger
from hyperon_das.utils import deserialize, serialize
from hyperon_das.utils import connect_to_server, deserialize, serialize


class FunctionsClient:
def __init__(self, url: str, server_count: int = 0, name: Optional[str] = None):
def __init__(self, host: str, port: int, server_count: int = 0, name: Optional[str] = None):
if not host:
raise ValueError('Host is required')

self.url = connect_to_server(host, port)

if not name:
self.name = f'server-{server_count}'
self.url = url

def _send_request(self, payload) -> Any:
try:
Expand Down Expand Up @@ -177,3 +181,16 @@ def custom_query(self, index_id: str, **kwargs) -> List[Dict[str, Any]]:
'input': {'index_id': index_id, 'kwargs': kwargs},
}
return self._send_request(payload)

def fetch(
self,
query: Union[List[dict], dict],
host: Optional[str] = None,
port: Optional[int] = None,
**kwargs,
) -> Any:
payload = {
'action': 'fetch',
'input': {'query': query, 'host': host, 'port': port, 'kwargs': kwargs},
}
return self._send_request(payload)
76 changes: 63 additions & 13 deletions hyperon_das/das.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from hyperon_das_atomdb.adapters import InMemoryDB, RedisMongoDB
from hyperon_das_atomdb.exceptions import InvalidAtomDB

from hyperon_das.cache import QueryAnswerIterator
from hyperon_das.cache import CacheManager, QueryAnswerIterator
from hyperon_das.exceptions import (
GetTraversalCursorException,
InvalidDASParameters,
Expand All @@ -17,31 +17,38 @@


class DistributedAtomSpace:
def __init__(self, **kwargs: Optional[Dict[str, Any]]) -> None:
atomdb_parameter = kwargs.get('atomdb', 'ram')
query_engine_parameter = kwargs.get('query_engine', 'local')

if atomdb_parameter == "ram":
def __init__(self, system_parameters: Dict[str, Any] = {}, **kwargs) -> None:
if not system_parameters.get('running_on_server'):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make an explicit private method _set_default_system_parameters() to setup default parameters as we'll probably have other cases where we'll want to do the same. I believe the algorithm should be as you did:

def __init__(self, system_parameters...):
   ...
   self.system_parameters = system_parameters
   self._set_default_system_parameters()
   ...
def _set_default_system_parameters():
    if not system_parameters.get('running_on_server'):
        system_parameters['running_on_server'] = False

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative design would be to use a dataclass SystemParameters instead of a dict. This way defaults would be put in the dataclass' constructor. This option has the advantage ensuring the use of the right parameter name across the code, which otherwise would be detected only in runtime. I'll let you decide.

system_parameters['running_on_server'] = False
self.system_parameters = system_parameters
atomdb = kwargs.get('atomdb', 'ram')
query_engine = kwargs.get('query_engine', 'local')

if atomdb == "ram":
self.backend = InMemoryDB()
elif atomdb_parameter == "redis_mongo":
elif atomdb == "redis_mongo":
self.backend = RedisMongoDB(**kwargs)
if query_engine_parameter != "local":
if query_engine != "local":
raise InvalidDASParameters(
message="'redis_mongo' backend requires local query engine ('query_engine=local')"
)
else:
raise InvalidAtomDB(message="Invalid AtomDB type. Choose either 'ram' or 'redis_mongo'")

if query_engine_parameter == 'local':
self.query_engine = LocalQueryEngine(self.backend, kwargs)
kwargs.update({'cache_manager': CacheManager(self.backend)})

if query_engine == 'local':
self._das_type = 'local_ram_only' if atomdb == 'ram' else 'local_redis_mongo'
self.query_engine = LocalQueryEngine(self.backend, self.system_parameters, kwargs)
logger().info('Initialized local Das')
elif query_engine_parameter == "remote":
self.query_engine = RemoteQueryEngine(self.backend, kwargs)
elif query_engine == "remote":
self._das_type = 'remote'
self.query_engine = RemoteQueryEngine(self.backend, self.system_parameters, kwargs)
logger().info('Initialized remote Das')
else:
raise InvalidQueryEngine(
message='The possible values are: `local` or `remote`',
details=f'query_engine={query_engine_parameter}',
details=f'query_engine={query_engine}',
)

@staticmethod
Expand Down Expand Up @@ -575,3 +582,46 @@ def create_field_index(
return self.query_engine.create_field_index(
atom_type, field, type=type, composite_type=composite_type
)

def fetch(
self,
query: Union[List[dict], dict],
host: Optional[str] = None,
port: Optional[int] = None,
**kwargs,
) -> Any:
"""Fetch data from the remote server using the a query as input and load it locally.
If it is a local DAS, the host and port must be sent.

The input dict is a link, used as a pattern to make the query.
Variables can be used as link targets as well as nodes. Nested links are
allowed as well.

Args:
query (Union[List[dict], dict]): A pattern described as a link (possibly with nested links)
with nodes and variables used to query the knowledge base.
host (Optional[str], optional): Address to remote server. Defaults to None.
port (Optional[int], optional): Port to remote server. Defaults to None.

Raises:
ValueError: If the 'host' and 'port' parameters are not sent to DAS local

Examples:
>>> query = {
"atom_type": "link",
"type": "Expression",
"targets": [
{"atom_type": "node", "type": "Symbol", "name": "Inheritance"},
{"atom_type": "variable", "name": "v1"},
{"atom_type": "node", "type": "Symbol", "name": '"mammal"'},
],
}
das = DistributedAtomSpace()
das.fetch(query, host='123.4.5.6', port=8080)
"""

if not self.system_parameters.get('running_on_server'):
if self._das_type != 'remote' and not host or not port:
raise ValueError("The 'host' and 'port' parameters must be sent to DAS local")

return self.query_engine.fetch(query, host, port, **kwargs)
9 changes: 5 additions & 4 deletions hyperon_das/decorators.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
from functools import wraps
from http import HTTPStatus # noqa: F401
from typing import Callable

from hyperon_das.exceptions import ConnectionError
Expand All @@ -17,9 +18,9 @@ def wrapper(*args, **kwargs):
while retry_count < attempts and timer_count < timeout_seconds:
try:
start_time = time.time()
response = function(*args, **kwargs)
status, response = function(*args, **kwargs)
end_time = time.time()
if response is not None:
if status == HTTPStatus.OK:
logger().debug(
f'{retry_count + 1} successful connection attempt at [host={args[1]}]'
)
Expand All @@ -34,9 +35,9 @@ def wrapper(*args, **kwargs):
time.sleep(waiting_time_seconds)
retry_count += 1
timer_count += end_time - start_time
port = f':{args[2]}' if args[2] else ''
port = f':{args[1]}' if len(args) > 1 else ''
message = (
f'Failed to connect to remote Das {args[1]}'
f'Failed to connect to remote Das {args[0]}'
+ port
+ f' - attempts:{retry_count} - time_attempted: {timer_count}'
)
Expand Down
Loading
Loading