Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#201] Implement fetch() in the DAS API #207

Merged
merged 15 commits into from
Mar 28, 2024
2 changes: 1 addition & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
@@ -1 +1 @@

[#201] Implement fetch() in the DAS API
30 changes: 29 additions & 1 deletion hyperon_das/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,40 @@
from collections import deque
from itertools import product
from threading import Semaphore, Thread
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, TypeVar, Union

from hyperon_das_atomdb import WILDCARD
from hyperon_das_atomdb.adapters import InMemoryDB, RedisMongoDB

from hyperon_das.client import FunctionsClient
from hyperon_das.utils import Assignment, QueryAnswer

AdapterDBType = TypeVar("AdapterDBType", RedisMongoDB, InMemoryDB)


class CacheManager:
def __init__(self, cache: AdapterDBType, **kwargs):
self.cache = cache

def fetch_data(
self,
query: Union[List[dict], dict],
host: Optional[str] = None,
port: Optional[int] = None,
**kwargs,
) -> List[Dict[str, Any]]:
try:
if not (server := kwargs.pop('server', None)):
server = FunctionsClient(host, port)
return server.fetch(query=query, **kwargs)
except Exception as e:
# TODO: Map possible errors
raise e

def bulk_insert(self, documents: Dict[str, Any]) -> None:
"""insert statements in "bulk", not returning rows"""
self.cache.bulk_insert(documents)


class QueryAnswerIterator(ABC):
def __init__(self, source: Any):
Expand Down
23 changes: 20 additions & 3 deletions hyperon_das/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@

from hyperon_das.exceptions import ConnectionError, HTTPError, RequestError, TimeoutError
from hyperon_das.logger import logger
from hyperon_das.utils import deserialize, serialize
from hyperon_das.utils import connect_to_server, deserialize, serialize


class FunctionsClient:
def __init__(self, url: str, server_count: int = 0, name: Optional[str] = None):
def __init__(self, host: str, port: int, server_count: int = 0, name: Optional[str] = None):
if not host:
raise ValueError('Host is required')

self.url = connect_to_server(host, port)

if not name:
self.name = f'server-{server_count}'
self.url = url

def _send_request(self, payload) -> Any:
try:
Expand Down Expand Up @@ -177,3 +181,16 @@ def custom_query(self, index_id: str, **kwargs) -> List[Dict[str, Any]]:
'input': {'index_id': index_id, 'kwargs': kwargs},
}
return self._send_request(payload)

def fetch(
self,
query: Union[List[dict], dict],
host: Optional[str] = None,
port: Optional[int] = None,
**kwargs,
) -> Any:
payload = {
'action': 'fetch',
'input': {'query': query, 'host': host, 'port': port, 'kwargs': kwargs},
}
return self._send_request(payload)
45 changes: 44 additions & 1 deletion hyperon_das/das.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from hyperon_das_atomdb.adapters import InMemoryDB, RedisMongoDB
from hyperon_das_atomdb.exceptions import InvalidAtomDB

from hyperon_das.cache import QueryAnswerIterator
from hyperon_das.cache import CacheManager, QueryAnswerIterator
from hyperon_das.exceptions import (
GetTraversalCursorException,
InvalidDASParameters,
Expand Down Expand Up @@ -32,6 +32,8 @@ def __init__(self, **kwargs: Optional[Dict[str, Any]]) -> None:
else:
raise InvalidAtomDB(message="Invalid AtomDB type. Choose either 'ram' or 'redis_mongo'")

kwargs.update({'cache_manager': CacheManager(self.backend)})

if query_engine_parameter == 'local':
self.query_engine = LocalQueryEngine(self.backend, kwargs)
logger().info('Initialized local Das')
Expand Down Expand Up @@ -575,3 +577,44 @@ def create_field_index(
return self.query_engine.create_field_index(
atom_type, field, type=type, composite_type=composite_type
)

def fetch(
self,
query: Union[List[dict], dict],
host: Optional[str] = None,
port: Optional[int] = None,
**kwargs,
) -> Any:
"""Fetch data from the remote server using the a query as input and load it locally.
If it is a local DAS, the host and port must be sent.

The input dict is a link, used as a pattern to make the query.
Variables can be used as link targets as well as nodes. Nested links are
allowed as well.

Args:
query (Union[List[dict], dict]): A pattern described as a link (possibly with nested links)
with nodes and variables used to query the knowledge base.
host (Optional[str], optional): Address to remote server. Defaults to None.
port (Optional[int], optional): Port to remote server. Defaults to None.

Raises:
ValueError: If the 'host' and 'port' parameters are not sent to DAS local

Examples:
>>> query = {
"atom_type": "link",
"type": "Expression",
"targets": [
{"atom_type": "node", "type": "Symbol", "name": "Inheritance"},
{"atom_type": "variable", "name": "v1"},
{"atom_type": "node", "type": "Symbol", "name": '"mammal"'},
],
}
das = DistributedAtomSpace()
das.fetch(query, host='123.4.5.6', port=8080)
"""
if not kwargs.get('running_on_server') and not host and not port:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check seems to be wrong. If it's not running on server then it's running in a local DAS. But if it's running in a local DAS, fetch will need both host and port. However, your check will not raise if one of them is None but the other is OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need to have a KW parameter to check if it's a local or a remote DAS. There should be a more elegant way to identify if it's running on a local or a remote DAS. This is a method in DAS public API so you are expecting that the user will pass this parameter... This seems like bad UX.

raise ValueError("The 'host' and 'port' parameters must be sent to DAS local")

return self.query_engine.fetch(query, host, port, **kwargs)
9 changes: 5 additions & 4 deletions hyperon_das/decorators.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
from functools import wraps
from http import HTTPStatus # noqa: F401
from typing import Callable

from hyperon_das.exceptions import ConnectionError
Expand All @@ -17,9 +18,9 @@ def wrapper(*args, **kwargs):
while retry_count < attempts and timer_count < timeout_seconds:
try:
start_time = time.time()
response = function(*args, **kwargs)
status, response = function(*args, **kwargs)
end_time = time.time()
if response is not None:
if status == HTTPStatus.OK:
logger().debug(
f'{retry_count + 1} successful connection attempt at [host={args[1]}]'
)
Expand All @@ -34,9 +35,9 @@ def wrapper(*args, **kwargs):
time.sleep(waiting_time_seconds)
retry_count += 1
timer_count += end_time - start_time
port = f':{args[2]}' if args[2] else ''
port = f':{args[1]}' if len(args) > 1 else ''
message = (
f'Failed to connect to remote Das {args[1]}'
f'Failed to connect to remote Das {args[0]}'
+ port
+ f' - attempts:{retry_count} - time_attempted: {timer_count}'
)
Expand Down
Loading
Loading