Skip to content

Commit

Permalink
Merge pull request #116 from singnet/feature/issue-114-create-method-…
Browse files Browse the repository at this point in the history
…to-create-indexes

[#114] Add create_field_index() to RedisMongoDB adapter
  • Loading branch information
marcocapozzoli authored Mar 19, 2024
2 parents 9a921d6 + bfc0757 commit 34fdb39
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
[#112] Fix return of the functions get_matched_links(), get_incoming_links(), get_matched_type_template(), get_matched_type() from set to list
[#112] Fix return of the functions get_matched_links(), get_incoming_links(), get_matched_type_template(), get_matched_type() from set to list
[#114] Add create_field_index() to RedisMongoDB adapter
3 changes: 3 additions & 0 deletions hyperon_das_atomdb/adapters/ram_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,3 +497,6 @@ def delete_atom(self, handle: str, **kwargs) -> None:
message='This atom does not exist',
details=f'handle: {handle}',
)

def create_field_index(self, atom_type: str, field: str, type: Optional[str] = None) -> str:
pass
82 changes: 82 additions & 0 deletions hyperon_das_atomdb/adapters/redis_mongo_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union

from pymongo import MongoClient
from pymongo import errors as pymongo_errors
from pymongo.collection import Collection
from pymongo.database import Database
from redis import Redis
from redis.cluster import RedisCluster
Expand All @@ -16,6 +18,7 @@
LinkDoesNotExist,
NodeDoesNotExist,
)
from hyperon_das_atomdb.index import Index
from hyperon_das_atomdb.logger import logger
from hyperon_das_atomdb.utils.expression_hasher import ExpressionHasher

Expand All @@ -31,6 +34,7 @@ class MongoCollectionNames(str, Enum):
LINKS_ARITY_2 = 'links_2'
LINKS_ARITY_N = 'links_n'
DAS_CONFIG = 'das_config'
CUSTOM_INDEXES = 'custom_indexes'


class MongoFieldNames(str, Enum):
Expand Down Expand Up @@ -85,6 +89,34 @@ def __str__(self):
return str(self.base)


class MongoDBIndex(Index):
def __init__(self, collection: Collection) -> None:
self.collection = collection

def create(self, field: str, **kwargs) -> Tuple[str, Any]:
conditionals = None

for key, value in kwargs.items():
conditionals = {key: {"$eq": value}}
break # only one key-value pair

index_id = self.generate_index_id(field)

index_conditionals = {"name": index_id}

if conditionals is not None:
index_conditionals["partialFilterExpression"] = conditionals

index_list = [(field, 1)] # store the index in ascending order

return self.collection.create_index(index_list, **index_conditionals), conditionals

def index_exists(self, index_id: str) -> bool:
indexes = self.collection.list_indexes()
index_ids = [index.get('name') for index in indexes]
return True if index_id in index_ids else False


class RedisMongoDB(AtomDB):
"""A concrete implementation using Redis and Mongo database"""

Expand All @@ -105,6 +137,9 @@ def __init__(self, **kwargs: Optional[Dict[str, Any]]) -> None:
}
self.mongo_nodes_collection = self.mongo_db.get_collection(MongoCollectionNames.NODES)
self.mongo_types_collection = self.mongo_db.get_collection(MongoCollectionNames.ATOM_TYPES)
self.mongo_custom_indexes_collection = self.mongo_db.get_collection(
MongoCollectionNames.CUSTOM_INDEXES
)
self.all_mongo_collections = [
(
MongoCollectionNames.LINKS_ARITY_1,
Expand Down Expand Up @@ -834,3 +869,50 @@ def delete_atom(self, handle: str, **kwargs) -> None:
)

self._update_link_index([document], delete_atom=True)

def create_field_index(self, atom_type: str, field: str, type: Optional[str] = None) -> str:
if atom_type == 'node':
collections = [self.mongo_nodes_collection]
elif atom_type == 'link':
collections = self.mongo_link_collection.values()
else:
raise ValueError("Invalid atom_type")

index_id = ""

try:
exc = ""
for collection in collections:
index_id, conditionals = MongoDBIndex(collection).create(field, named_type=type)
self.mongo_custom_indexes_collection.update_one(
filter={'_id': index_id},
update={'$set': {'_id': index_id, 'conditionals': conditionals}},
upsert=True,
)
except pymongo_errors.OperationFailure as e:
exc = e
logger().error(f"Error creating index in collection '{collection}': {str(e)}")
except Exception as e:
exc = e
logger().error(f"Error: {str(e)}")
finally:
if not index_id:
return (
f"Index creation failed, Details: {str(exc)}"
if exc
else "Index creation failed"
)

return index_id

def retrieve_mongo_document_by_index(
self, collection: Collection, index_id: str, **kwargs
) -> List[Dict[str, Any]]:
if MongoDBIndex(collection).index_exists(index_id):
kwargs.update(
self.mongo_custom_indexes_collection.find_one({'_id': index_id})['conditionals']
)
pymongo_cursor = collection.find(kwargs).hint(
index_id
) # Using the hint() method is an additional measure to ensure its use
return [document for document in pymongo_cursor]
4 changes: 4 additions & 0 deletions hyperon_das_atomdb/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,3 +592,7 @@ def delete_atom(self, handle: str, **kwargs) -> None:
handle (str): Atom handle
"""
... # pragma no cover

@abstractmethod
def create_field_index(self, atom_type: str, field: str, type: Optional[str] = None) -> str:
... # pragma no cover
42 changes: 42 additions & 0 deletions hyperon_das_atomdb/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from abc import ABC, abstractmethod
from typing import Any, Tuple

from hyperon_das_atomdb.utils.expression_hasher import ExpressionHasher


class Index(ABC):
@staticmethod
def generate_index_id(field: str) -> str:
"""Generates an index ID based on the field name.
Args:
field (str): The field name.
Returns:
str: The index ID.
"""
return f"index_{ExpressionHasher._compute_hash(field)}"

@abstractmethod
def create(self, field: str, **kwargs) -> Tuple[str, Any]:
"""Creates an index on the given field.
Args:
field (str): The field to create the index on.
Returns:
Tuple[str, Any]: The index ID.
"""
... # pragma: no cover

@abstractmethod
def index_exists(self, index_id: str) -> bool:
"""Checks if an index exists
Args:
index_id (str): The index ID.
Returns:
bool: True if the index exists, False otherwise.
"""
... # pragma: no cover
53 changes: 53 additions & 0 deletions tests/integration/adapters/test_redis_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,3 +932,56 @@ def test_get_matched_with_pagination(self):
],
)
_db_down()

def test_create_field_index(self):
_db_up()
db = self._connect_db()
self._add_atoms(db)
db.add_link(
{
"type": "Similarity",
"targets": [
{"type": "Concept", "name": 'human'},
{"type": "Concept", "name": 'monkey'},
],
"tag": 'DAS',
}
)
db.commit()

link_collections = list(db.mongo_link_collection.values())

links_2 = link_collections[0]
links_1 = link_collections[1]
links_n = link_collections[2]

response = links_n.find({'named_type': 'Similarity', 'tag': 'DAS'}).explain()

with pytest.raises(KeyError):
response['queryPlanner']['winningPlan']['inputStage']['indexName']

# Create the index
my_index = db.create_field_index(atom_type='link', field='tag', type='Similarity')

links_2_index_names = [idx.get('name') for idx in links_2.list_indexes()]
links_1_index_names = [idx.get('name') for idx in links_1.list_indexes()]
links_n_index_names = [idx.get('name') for idx in links_n.list_indexes()]

assert my_index in links_2_index_names
assert my_index in links_1_index_names
assert my_index in links_n_index_names

# Using the index
response = links_n.find({'named_type': 'Similarity', 'tag': 'DAS'}).explain()

assert my_index == response['queryPlanner']['winningPlan']['inputStage']['indexName']

# Retrieve the document using the index
doc = db.retrieve_mongo_document_by_index(links_n, my_index, tag='DAS')
assert doc[0]['_id'] == ExpressionHasher.expression_hash(
ExpressionHasher.named_type_hash("Similarity"), [human, monkey]
)
assert doc[0]['key_0'] == human
assert doc[0]['key_1'] == monkey

_db_down()
60 changes: 60 additions & 0 deletions tests/unit/adapters/test_redis_mongo_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pymongo import MongoClient
from pymongo.collection import Collection
from pymongo.database import Database
from pymongo.errors import OperationFailure
from redis import Redis

from hyperon_das_atomdb.adapters import RedisMongoDB
Expand Down Expand Up @@ -1993,3 +1994,62 @@ def test_get_atom_type(self, database):
assert 'Concept' == database.get_atom_type(h)
assert 'Concept' == database.get_atom_type(m)
assert 'Inheritance' == database.get_atom_type(i)

def test_create_field_index_node_collection(self, database):
database.mongo_nodes_collection = mock.Mock()
database.mongo_link_collection = {}
database.mongo_nodes_collection.create_index.return_value = 'name_index_asc'
with mock.patch(
'hyperon_das_atomdb.index.Index.generate_index_id',
return_value='name_index_asc',
):
result = database.create_field_index('node', 'name', 'Type')

assert result == 'name_index_asc'
database.mongo_nodes_collection.create_index.assert_called_once_with(
[('name', 1)],
name='name_index_asc',
partialFilterExpression={'named_type': {'$eq': 'Type'}},
)

def test_create_field_index_link_collection(self, database):
database.mongo_nodes_collection = mock.Mock()
database.mongo_link_collection = {'link1': mock.Mock()}
database.mongo_link_collection['link1'].create_index.return_value = 'link_index_asc'
with mock.patch(
'hyperon_das_atomdb.index.Index.generate_index_id',
return_value='link_index_asc',
):
result = database.create_field_index('link', 'field', 'Type')

assert result == 'link_index_asc'
database.mongo_link_collection['link1'].create_index.assert_called_once_with(
[('field', 1)],
name='link_index_asc',
partialFilterExpression={'named_type': {'$eq': 'Type'}},
)

def test_create_field_index_invalid_collection(self, database):
with pytest.raises(ValueError):
database.create_field_index('invalid_atom_type', 'field', 'type')

def test_create_field_index_operation_failure(self, database):
database.mongo_nodes_collection = mock.Mock()
database.mongo_nodes_collection.create_index.side_effect = OperationFailure(
'Index creation failed'
)
result = database.create_field_index('node', 'field', 'Type')

assert result == 'Index creation failed, Details: Index creation failed'

def test_create_field_index_already_exists(self, database):
database.mongo_nodes_collection = mock.Mock()
database.mongo_link_collection = {}
database.mongo_nodes_collection.list_indexes.return_value = []
database.mongo_nodes_collection.create_index.return_value = 'name_index_asc'
with mock.patch(
'hyperon_das_atomdb.index.Index.generate_index_id',
return_value='name_index_asc',
):
database.create_field_index('node', 'name', 'Type')
assert database.create_field_index('node', 'name', 'Type') == 'name_index_asc'

0 comments on commit 34fdb39

Please sign in to comment.