From a29390ee57bccb7223960f364fe76958a9118b60 Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Wed, 20 Mar 2024 18:57:17 -0300 Subject: [PATCH 1/3] Refactor collection in RedisMongoDB --- CHANGELOG | 3 +- hyperon_das_atomdb/adapters/redis_mongo_db.py | 316 ++++++-------- .../integration/adapters/test_redis_mongo.py | 20 +- tests/unit/adapters/test_redis_mongo_db.py | 396 +++++++----------- 4 files changed, 270 insertions(+), 465 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 051feb1d..f82f785f 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,2 +1,3 @@ [#112] Fix return of the functions get_matched_links(), get_incoming_links(), get_matched_type_template(), get_matched_type() from set to list -[#114] Add create_field_index() to RedisMongoDB adapter \ No newline at end of file +[#114] Add create_field_index() to RedisMongoDB adapter +[#120] Refactor Collections in RedisMongoDB adapter \ No newline at end of file diff --git a/hyperon_das_atomdb/adapters/redis_mongo_db.py b/hyperon_das_atomdb/adapters/redis_mongo_db.py index c9077108..756aaf05 100644 --- a/hyperon_das_atomdb/adapters/redis_mongo_db.py +++ b/hyperon_das_atomdb/adapters/redis_mongo_db.py @@ -28,11 +28,8 @@ def _build_redis_key(prefix, key): class MongoCollectionNames(str, Enum): - NODES = 'nodes' + ATOMS = 'atoms' ATOM_TYPES = 'atom_types' - LINKS_ARITY_1 = 'links_1' - LINKS_ARITY_2 = 'links_2' - LINKS_ARITY_N = 'links_n' DAS_CONFIG = 'das_config' CUSTOM_INDEXES = 'custom_indexes' @@ -130,30 +127,13 @@ def __init__(self, **kwargs: Optional[Dict[str, Any]]) -> None: self.database_name = 'das' self._setup_databases(**kwargs) self.use_metta_mapping = kwargs.get("use_metta_mapping", True) - self.mongo_link_collection = { - "2": self.mongo_db.get_collection(MongoCollectionNames.LINKS_ARITY_2), - "1": self.mongo_db.get_collection(MongoCollectionNames.LINKS_ARITY_1), - "N": self.mongo_db.get_collection(MongoCollectionNames.LINKS_ARITY_N), - } - self.mongo_nodes_collection = self.mongo_db.get_collection(MongoCollectionNames.NODES) + self.mongo_atoms_collection = self.mongo_db.get_collection(MongoCollectionNames.ATOMS) self.mongo_types_collection = self.mongo_db.get_collection(MongoCollectionNames.ATOM_TYPES) self.mongo_custom_indexes_collection = self.mongo_db.get_collection( MongoCollectionNames.CUSTOM_INDEXES ) self.all_mongo_collections = [ - ( - MongoCollectionNames.LINKS_ARITY_1, - self.mongo_link_collection["1"], - ), - ( - MongoCollectionNames.LINKS_ARITY_2, - self.mongo_link_collection["2"], - ), - ( - MongoCollectionNames.LINKS_ARITY_N, - self.mongo_link_collection["N"], - ), - (MongoCollectionNames.NODES, self.mongo_nodes_collection), + (MongoCollectionNames.ATOMS, self.mongo_atoms_collection), (MongoCollectionNames.ATOM_TYPES, self.mongo_types_collection), ] self.mongo_das_config_collection = None @@ -301,45 +281,13 @@ def _get_atom_type_hash(self, atom_type): self.named_type_hash[atom_type] = named_type_hash return named_type_hash - def _retrieve_mongo_document(self, handle: str, arity=-1) -> dict: + def _retrieve_mongo_document(self, handle: str) -> dict: mongo_filter = {MongoFieldNames.ID_HASH: handle} - document = None - if arity >= 0: - if arity == 0: - return self.mongo_nodes_collection.find_one(mongo_filter) - else: - if self.use_metta_mapping: - document = self.mongo_link_collection["N"].find_one(mongo_filter) - if document: - document["targets"] = self._get_mongo_document_keys(document) - return document - else: - if arity == 2: - document = self.mongo_link_collection["2"].find_one(mongo_filter) - if document: - document["targets"] = self._get_mongo_document_keys(document) - return document - elif arity == 1: - document = self.mongo_link_collection["1"].find_one(mongo_filter) - if document: - document["targets"] = self._get_mongo_document_keys(document) - return document - else: - document = self.mongo_link_collection["N"].find_one(mongo_filter) - if document: - document["targets"] = self._get_mongo_document_keys(document) - return document - - # The order of keys in search is important. Greater to smallest - # probability of proper arity - document = self.mongo_nodes_collection.find_one(mongo_filter) - if document: - return document - for collection in [self.mongo_link_collection[key] for key in ["2", "1", "N"]]: - document = collection.find_one(mongo_filter) - if document: + document = self.mongo_atoms_collection.find_one(mongo_filter) + if document := self.mongo_atoms_collection.find_one(mongo_filter): + if self._is_document_link(document): document["targets"] = self._get_mongo_document_keys(document) - return document + return document return None def _build_named_type_hash_template(self, template: Union[str, List[Any]]) -> List[Any]: @@ -373,14 +321,14 @@ def _filter_non_toplevel(self, matches: list) -> list: # matches = matches[0] for match in matches: link_handle = match[0] - link = self._retrieve_mongo_document(link_handle, len(match[-1])) + link = self._retrieve_mongo_document(link_handle) if link['is_toplevel']: matches_toplevel_only.append(match) return matches_toplevel_only def get_node_handle(self, node_type: str, node_name: str) -> str: node_handle = self.node_handle(node_type, node_name) - document = self._retrieve_mongo_document(node_handle, 0) + document = self._retrieve_mongo_document(node_handle) if document is not None: return document[MongoFieldNames.ID_HASH] else: @@ -413,36 +361,35 @@ def get_matched_node_name(self, node_type: str, substring: str) -> str: } return [ document[MongoFieldNames.ID_HASH] - for document in self.mongo_nodes_collection.find(mongo_filter) + for document in self.mongo_atoms_collection.find(mongo_filter) ] def get_all_nodes(self, node_type: str, names: bool = False) -> List[str]: if names: return [ document[MongoFieldNames.NODE_NAME] - for document in self.mongo_nodes_collection.find( + for document in self.mongo_atoms_collection.find( {MongoFieldNames.TYPE_NAME: node_type} ) ] else: return [ document[MongoFieldNames.ID_HASH] - for document in self.mongo_nodes_collection.find( + for document in self.mongo_atoms_collection.find( {MongoFieldNames.TYPE_NAME: node_type} ) ] def get_all_links(self, link_type: str) -> List[str]: links_handle = [] - for collection in [self.mongo_link_collection[key] for key in ["2", "1", "N"]]: - documents = collection.find({MongoFieldNames.TYPE_NAME: link_type}) - for document in documents: - links_handle.append(document[MongoFieldNames.ID_HASH]) + documents = self.mongo_atoms_collection.find({MongoFieldNames.TYPE_NAME: link_type}) + for document in documents: + links_handle.append(document[MongoFieldNames.ID_HASH]) return links_handle def get_link_handle(self, link_type: str, target_handles: List[str]) -> str: link_handle = self.link_handle(link_type, target_handles) - document = self._retrieve_mongo_document(link_handle, len(target_handles)) + document = self._retrieve_mongo_document(link_handle) if document is not None: return document[MongoFieldNames.ID_HASH] else: @@ -502,8 +449,7 @@ def get_matched_links( pattern_hash = ExpressionHasher.composite_hash([link_type_hash, *target_handles]) cursor, patterns_matched = self._retrieve_pattern(pattern_hash, **kwargs) toplevel_only = kwargs.get('toplevel_only', False) - r = self._process_matched_results(patterns_matched, cursor, toplevel_only) - return r + return self._process_matched_results(patterns_matched, cursor, toplevel_only) def get_incoming_links( self, atom_handle: str, **kwargs @@ -545,8 +491,7 @@ def get_link_type(self, link_handle: str) -> str: def get_atom(self, handle: str, **kwargs) -> Dict[str, Any]: document = self._retrieve_mongo_document(handle) if document: - atom = self._convert_atom_format(document, **kwargs) - return atom + return self._convert_atom_format(document, **kwargs) else: logger().error( f'Failed to retrieve atom for handle: {handle}. This link may not exist. - Details: {kwargs}' @@ -561,9 +506,9 @@ def get_atom_type(self, handle: str) -> str: if atom is not None: return atom['named_type'] - def get_atom_as_dict(self, handle, arity=-1) -> dict: + def get_atom_as_dict(self, handle) -> dict: answer = {} - document = self._retrieve_mongo_document(handle, arity) + document = self._retrieve_mongo_document(handle) if document: answer["handle"] = document[MongoFieldNames.ID_HASH] answer["type"] = document[MongoFieldNames.TYPE_NAME] @@ -574,11 +519,12 @@ def get_atom_as_dict(self, handle, arity=-1) -> dict: return answer def count_atoms(self) -> Tuple[int, int]: - node_count = self.mongo_nodes_collection.estimated_document_count() - link_count = 0 - for collection in self.mongo_link_collection.values(): - link_count += collection.estimated_document_count() - return (node_count, link_count) + atoms_count = self.mongo_atoms_collection.estimated_document_count() + nodes_count = self.mongo_atoms_collection.count_documents( + {MongoFieldNames.COMPOSITE_TYPE: {'$exists': False}} + ) + links_count = atoms_count - nodes_count + return (nodes_count, links_count) def clear_database(self) -> None: """ @@ -598,25 +544,23 @@ def commit(self) -> None: id_tag = MongoFieldNames.ID_HASH for key, (collection, buffer) in self.mongo_bulk_insertion_buffer.items(): if buffer: + if key == MongoCollectionNames.ATOM_TYPES: + logger().error('Failed to commit Atom Types. This operation is not allowed') + raise InvalidOperationException + documents = [d.base for d in buffer] for document in documents: collection.replace_one({id_tag: document[id_tag]}, document, upsert=True) - if key == MongoCollectionNames.NODES: - self._update_node_index(documents) - elif key == MongoCollectionNames.ATOM_TYPES: - logger().error('Failed to commit Atom Types. This operation is not allowed') - raise InvalidOperationException - else: - self._update_link_index(documents) + self._update_atom_indexes(documents) buffer.clear() def add_node(self, node_params: Dict[str, Any]) -> Dict[str, Any]: handle, node = self._add_node(node_params) if sys.getsizeof(node_params['name']) < self.max_mongo_db_document_size: - _, buffer = self.mongo_bulk_insertion_buffer[MongoCollectionNames.NODES] + _, buffer = self.mongo_bulk_insertion_buffer[MongoCollectionNames.ATOMS] buffer.add(_HashableDocument(node)) if len(buffer) >= self.mongo_bulk_insertion_limit: self.commit() @@ -626,17 +570,7 @@ def add_node(self, node_params: Dict[str, Any]) -> Dict[str, Any]: def add_link(self, link_params: Dict[str, Any], toplevel: bool = True) -> Dict[str, Any]: handle, link, targets = self._add_link(link_params, toplevel) - if self.use_metta_mapping: - collection_name = MongoCollectionNames.LINKS_ARITY_N - else: - arity = len(targets) - if arity == 1: - collection_name = MongoCollectionNames.LINKS_ARITY_1 - elif arity == 2: - collection_name = MongoCollectionNames.LINKS_ARITY_2 - else: - collection_name = MongoCollectionNames.LINKS_ARITY_N - _, buffer = self.mongo_bulk_insertion_buffer[collection_name] + _, buffer = self.mongo_bulk_insertion_buffer[MongoCollectionNames.ATOMS] buffer.add(_HashableDocument(link)) if len(buffer) >= self.mongo_bulk_insertion_limit: self.commit() @@ -645,9 +579,8 @@ def add_link(self, link_params: Dict[str, Any], toplevel: bool = True) -> Dict[s def _get_and_delete_links_by_handles(self, handles: List[str]) -> Dict[str, Any]: documents = [] for handle in handles: - if any( - (document := collection.find_one_and_delete({MongoFieldNames.ID_HASH: handle})) - for collection in self.mongo_link_collection.values() + if document := self.mongo_atoms_collection.find_one_and_delete( + {MongoFieldNames.ID_HASH: handle} ): documents.append(document) return documents @@ -751,73 +684,82 @@ def _get_redis_members(self, key, **kwargs) -> Tuple[int, list]: return cursor, members - def _update_node_index(self, documents: Iterable[Dict[str, any]], **kwargs) -> None: + def _update_atom_indexes(self, documents: Iterable[Dict[str, any]], **kwargs) -> None: for document in documents: - handle = document[MongoFieldNames.ID_HASH] - node_name = document[MongoFieldNames.NODE_NAME] - key = _build_redis_key(KeyPrefix.NAMED_ENTITIES, handle) - if kwargs.get('delete_atom', False): - self.redis.delete(key) + if self._is_document_link(document): + self._update_link_index(document, **kwargs) else: - self.redis.set(key, node_name) + self._update_node_index(document, **kwargs) - def _update_link_index(self, documents: Iterable[Dict[str, any]], **kwargs) -> None: - incoming_buffer = {} - for document in documents: - handle = document[MongoFieldNames.ID_HASH] - targets = self._get_mongo_document_keys(document) - targets_str = "".join(targets) - arity = len(targets) - named_type = document[MongoFieldNames.TYPE_NAME] - named_type_hash = document[MongoFieldNames.TYPE_NAME_HASH] - value = f"{handle}{targets_str}" - - if self.pattern_index_templates: - index_templates = self.pattern_index_templates.get(named_type, []) - else: - index_templates = self.default_pattern_index_templates + def _update_node_index(self, document: Iterable[Dict[str, Any]], **kwargs) -> None: + handle = document[MongoFieldNames.ID_HASH] + node_name = document[MongoFieldNames.NODE_NAME] + key = _build_redis_key(KeyPrefix.NAMED_ENTITIES, handle) + if kwargs.get('delete_atom', False): + self.redis.delete(key) + if links_handle := self._retrieve_and_delete_incoming_set(handle): + documents = self._get_and_delete_links_by_handles(links_handle) + for document in documents: + self._update_link_index(document, delete_atom=True) + else: + self.redis.set(key, node_name) + + def _update_link_index(self, document: Dict[str, Any], **kwargs) -> None: + handle = document[MongoFieldNames.ID_HASH] + targets = self._get_mongo_document_keys(document) + targets_str = "".join(targets) + arity = len(targets) + named_type = document[MongoFieldNames.TYPE_NAME] + named_type_hash = document[MongoFieldNames.TYPE_NAME_HASH] + value = f"{handle}{targets_str}" + + if self.pattern_index_templates: + index_templates = self.pattern_index_templates.get(named_type, []) + else: + index_templates = self.default_pattern_index_templates - if kwargs.get('delete_atom', False): - links_handle = self._retrieve_and_delete_incoming_set(handle) + if kwargs.get('delete_atom', False): + links_handle = self._retrieve_and_delete_incoming_set(handle) - if links_handle: - documents = self._get_and_delete_links_by_handles(links_handle) - if documents: - self._update_link_index(documents, delete_atom=True) + if links_handle: + docs = self._get_and_delete_links_by_handles(links_handle) + for doc in docs: + self._update_link_index(doc, delete_atom=True) - outgoing_atoms = self._retrieve_outgoing_set(handle, delete=True) + outgoing_atoms = self._retrieve_outgoing_set(handle, delete=True) - for atom_handle in outgoing_atoms: - self._delete_smember_incoming_set(atom_handle, handle) + for atom_handle in outgoing_atoms: + self._delete_smember_incoming_set(atom_handle, handle) - for type_hash in [MongoFieldNames.TYPE, MongoFieldNames.TYPE_NAME_HASH]: - self._delete_smember_template(document[type_hash], value) + for type_hash in [MongoFieldNames.TYPE, MongoFieldNames.TYPE_NAME_HASH]: + self._delete_smember_template(document[type_hash], value) - for template in index_templates: - key = self._apply_index_template(template, named_type_hash, targets, arity) - self.redis.srem(key, value) - else: - key = _build_redis_key(KeyPrefix.OUTGOING_SET, handle) - self.redis.set(key, targets_str) + for template in index_templates: + key = self._apply_index_template(template, named_type_hash, targets, arity) + self.redis.srem(key, value) + else: + incoming_buffer = {} + key = _build_redis_key(KeyPrefix.OUTGOING_SET, handle) + self.redis.set(key, targets_str) - for target in targets: - buffer = incoming_buffer.get(target, None) - if buffer is None: - buffer = [] - incoming_buffer[target] = buffer - buffer.append(handle) + for target in targets: + buffer = incoming_buffer.get(target, None) + if buffer is None: + buffer = [] + incoming_buffer[target] = buffer + buffer.append(handle) - for type_hash in [MongoFieldNames.TYPE, MongoFieldNames.TYPE_NAME_HASH]: - key = _build_redis_key(KeyPrefix.TEMPLATES, document[type_hash]) - self.redis.sadd(key, value) + for type_hash in [MongoFieldNames.TYPE, MongoFieldNames.TYPE_NAME_HASH]: + key = _build_redis_key(KeyPrefix.TEMPLATES, document[type_hash]) + self.redis.sadd(key, value) - for template in index_templates: - key = self._apply_index_template(template, named_type_hash, targets, arity) - self.redis.sadd(key, value) + for template in index_templates: + key = self._apply_index_template(template, named_type_hash, targets, arity) + self.redis.sadd(key, value) - for handle in incoming_buffer: - key = _build_redis_key(KeyPrefix.INCOMING_SET, handle) - self.redis.sadd(key, *incoming_buffer[handle]) + for handle in incoming_buffer: + key = _build_redis_key(KeyPrefix.INCOMING_SET, handle) + self.redis.sadd(key, *incoming_buffer[handle]) def _process_matched_results( self, matched: list, cursor: int = None, toplevel_only: bool = False @@ -832,63 +774,45 @@ def _process_matched_results( else: return answer + def _is_document_link(self, document: Dict[str, Any]) -> bool: + return True if MongoFieldNames.COMPOSITE_TYPE in document else False + def reindex(self, pattern_index_templates: Optional[Dict[str, Dict[str, Any]]] = None): if pattern_index_templates is not None: self.pattern_index_templates = deepcopy(pattern_index_templates) self.redis.flushall() - for collection in self.mongo_link_collection.values(): - self._update_link_index(collection.find({})) + self._update_atom_indexes(self.mongo_atoms_collection.find({})) def delete_atom(self, handle: str, **kwargs) -> None: self.commit() mongo_filter = {MongoFieldNames.ID_HASH: handle} - node = self.mongo_nodes_collection.find_one_and_delete(mongo_filter) - - if node: - self._update_node_index([node], delete_atom=True) - - links_handle = self._retrieve_and_delete_incoming_set(handle) + document = self.mongo_atoms_collection.find_one_and_delete(mongo_filter) - if links_handle: - documents = self._get_and_delete_links_by_handles(links_handle) - self._update_link_index(documents, delete_atom=True) - else: - for collection in self.mongo_link_collection.values(): - document = collection.find_one_and_delete(mongo_filter) - if document: - break - else: - logger().error( - f'Failed to delete atom for handle: {handle}. This atom may not exist. - Details: {kwargs}' - ) - raise AtomDoesNotExist( - message='This atom does not exist', - details=f'handle: {handle}', - ) - - self._update_link_index([document], delete_atom=True) + if not document: + logger().error( + f'Failed to delete atom for handle: {handle}. This atom may not exist. - Details: {kwargs}' + ) + raise AtomDoesNotExist( + message='This atom does not exist', + details=f'handle: {handle}', + ) + self._update_atom_indexes([document], delete_atom=True) def create_field_index(self, atom_type: str, field: str, type: Optional[str] = None) -> str: - if atom_type == 'node': - collections = [self.mongo_nodes_collection] - elif atom_type == 'link': - collections = self.mongo_link_collection.values() - else: - raise ValueError("Invalid atom_type") + collection = self.mongo_atoms_collection index_id = "" try: exc = "" - for collection in collections: - index_id, conditionals = MongoDBIndex(collection).create(field, named_type=type) - self.mongo_custom_indexes_collection.update_one( - filter={'_id': index_id}, - update={'$set': {'_id': index_id, 'conditionals': conditionals}}, - upsert=True, - ) + index_id, conditionals = MongoDBIndex(collection).create(field, named_type=type) + self.mongo_custom_indexes_collection.update_one( + filter={'_id': index_id}, + update={'$set': {'_id': index_id, 'conditionals': conditionals}}, + upsert=True, + ) except pymongo_errors.OperationFailure as e: exc = e logger().error(f"Error creating index in collection '{collection}': {str(e)}") diff --git a/tests/integration/adapters/test_redis_mongo.py b/tests/integration/adapters/test_redis_mongo.py index d8343b77..5612997e 100644 --- a/tests/integration/adapters/test_redis_mongo.py +++ b/tests/integration/adapters/test_redis_mongo.py @@ -949,13 +949,9 @@ def test_create_field_index(self): ) db.commit() - link_collections = list(db.mongo_link_collection.values()) + collection = db.mongo_atoms_collection - links_2 = link_collections[0] - links_1 = link_collections[1] - links_n = link_collections[2] - - response = links_n.find({'named_type': 'Similarity', 'tag': 'DAS'}).explain() + response = collection.find({'named_type': 'Similarity', 'tag': 'DAS'}).explain() with pytest.raises(KeyError): response['queryPlanner']['winningPlan']['inputStage']['indexName'] @@ -963,21 +959,17 @@ def test_create_field_index(self): # Create the index my_index = db.create_field_index(atom_type='link', field='tag', type='Similarity') - links_2_index_names = [idx.get('name') for idx in links_2.list_indexes()] - links_1_index_names = [idx.get('name') for idx in links_1.list_indexes()] - links_n_index_names = [idx.get('name') for idx in links_n.list_indexes()] + collection_index_names = [idx.get('name') for idx in collection.list_indexes()] - assert my_index in links_2_index_names - assert my_index in links_1_index_names - assert my_index in links_n_index_names + assert my_index in collection_index_names # Using the index - response = links_n.find({'named_type': 'Similarity', 'tag': 'DAS'}).explain() + response = collection.find({'named_type': 'Similarity', 'tag': 'DAS'}).explain() assert my_index == response['queryPlanner']['winningPlan']['inputStage']['indexName'] # Retrieve the document using the index - doc = db.retrieve_mongo_document_by_index(links_n, my_index, tag='DAS') + doc = db.retrieve_mongo_document_by_index(collection, my_index, tag='DAS') assert doc[0]['_id'] == ExpressionHasher.expression_hash( ExpressionHasher.named_type_hash("Similarity"), [human, monkey] ) diff --git a/tests/unit/adapters/test_redis_mongo_db.py b/tests/unit/adapters/test_redis_mongo_db.py index 5ce7df17..6eeca989 100644 --- a/tests/unit/adapters/test_redis_mongo_db.py +++ b/tests/unit/adapters/test_redis_mongo_db.py @@ -18,94 +18,6 @@ ) from hyperon_das_atomdb.utils.expression_hasher import ExpressionHasher -node_collection_mock_data = [ - { - '_id': 'af12f10f9ae2002a1607ba0b47ba8407', - 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', - 'name': 'human', - 'named_type': 'Concept', - }, - { - '_id': '1cdffc6b0b89ff41d68bec237481d1e1', - 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', - 'name': 'monkey', - 'named_type': 'Concept', - }, - { - '_id': '5b34c54bee150c04f9fa584b899dc030', - 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', - 'name': 'chimp', - 'named_type': 'Concept', - }, - { - '_id': 'c1db9b517073e51eb7ef6fed608ec204', - 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', - 'name': 'snake', - 'named_type': 'Concept', - }, - { - '_id': 'bb34ce95f161a6b37ff54b3d4c817857', - 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', - 'name': 'earthworm', - 'named_type': 'Concept', - }, - { - '_id': '99d18c702e813b07260baf577c60c455', - 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', - 'name': 'rhino', - 'named_type': 'Concept', - }, - { - '_id': 'd03e59654221c1e8fcda404fd5c8d6cb', - 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', - 'name': 'triceratops', - 'named_type': 'Concept', - }, - { - '_id': 'b94941d8cd1c0ee4ad3dd3dcab52b964', - 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', - 'name': 'vine', - 'named_type': 'Concept', - }, - { - '_id': '4e8e26e3276af8a5c2ac2cc2dc95c6d2', - 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', - 'name': 'ent', - 'named_type': 'Concept', - }, - { - '_id': 'bdfe4e7a431f73386f37c6448afe5840', - 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', - 'name': 'mammal', - 'named_type': 'Concept', - }, - { - '_id': '0a32b476852eeb954979b87f5f6cb7af', - 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', - 'name': 'animal', - 'named_type': 'Concept', - }, - { - '_id': 'b99ae727c787f1b13b452fd4c9ce1b9a', - 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', - 'name': 'reptile', - 'named_type': 'Concept', - }, - { - '_id': '08126b066d32ee37743e255a2558cccd', - 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', - 'name': 'dinosaur', - 'named_type': 'Concept', - }, - { - '_id': '80aff30094874e75028033a38ce677bb', - 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', - 'name': 'plant', - 'named_type': 'Concept', - }, -] -added_nodes = [] - type_collection_mock_data = [ { "_id": "3e419e4d468bdac682103ea2615d0902", @@ -217,7 +129,91 @@ }, ] -arity_2_collection_mock_data = [ +atom_collection_mock_data = [ + { + '_id': 'af12f10f9ae2002a1607ba0b47ba8407', + 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'name': 'human', + 'named_type': 'Concept', + }, + { + '_id': '1cdffc6b0b89ff41d68bec237481d1e1', + 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'name': 'monkey', + 'named_type': 'Concept', + }, + { + '_id': '5b34c54bee150c04f9fa584b899dc030', + 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'name': 'chimp', + 'named_type': 'Concept', + }, + { + '_id': 'c1db9b517073e51eb7ef6fed608ec204', + 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'name': 'snake', + 'named_type': 'Concept', + }, + { + '_id': 'bb34ce95f161a6b37ff54b3d4c817857', + 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'name': 'earthworm', + 'named_type': 'Concept', + }, + { + '_id': '99d18c702e813b07260baf577c60c455', + 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'name': 'rhino', + 'named_type': 'Concept', + }, + { + '_id': 'd03e59654221c1e8fcda404fd5c8d6cb', + 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'name': 'triceratops', + 'named_type': 'Concept', + }, + { + '_id': 'b94941d8cd1c0ee4ad3dd3dcab52b964', + 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'name': 'vine', + 'named_type': 'Concept', + }, + { + '_id': '4e8e26e3276af8a5c2ac2cc2dc95c6d2', + 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'name': 'ent', + 'named_type': 'Concept', + }, + { + '_id': 'bdfe4e7a431f73386f37c6448afe5840', + 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'name': 'mammal', + 'named_type': 'Concept', + }, + { + '_id': '0a32b476852eeb954979b87f5f6cb7af', + 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'name': 'animal', + 'named_type': 'Concept', + }, + { + '_id': 'b99ae727c787f1b13b452fd4c9ce1b9a', + 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'name': 'reptile', + 'named_type': 'Concept', + }, + { + '_id': '08126b066d32ee37743e255a2558cccd', + 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'name': 'dinosaur', + 'named_type': 'Concept', + }, + { + '_id': '80aff30094874e75028033a38ce677bb', + 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'name': 'plant', + 'named_type': 'Concept', + }, { '_id': '2d7abd27644a9c08a7ca2c8d68338579', 'composite_type_hash': 'ed73ea081d170e1d89fc950820ce1cee', @@ -615,7 +611,7 @@ 'named_type_hash': 'b74a43dbb36287ea86eb5b0c7b86e8e8', }, ] -added_links_arity_2 = [] +added_atoms = [] outgoing_set_redis_mock_data = [ { @@ -1375,56 +1371,65 @@ def commit(): return redis_db @pytest.fixture() - def mongo_nodes_collection(self, mongo_db): + def mongo_atoms_collection(self, mongo_db): collection = mock.MagicMock( - spec=Collection, database=mongo_db, name=MongoCollectionNames.NODES + spec=Collection, database=mongo_db, name=MongoCollectionNames.ATOMS ) def insert_many(documents: List[Dict[str, Any]], ordered: bool): - added_nodes.extend(documents) + added_atoms.extend(documents) def replace_one(handle: Dict[str, Any], document: Dict[str, Any], upsert: bool): - for cursor in range(len(node_collection_mock_data)): - if node_collection_mock_data[cursor]['_id'] == handle['_id']: - node_collection_mock_data.pop(cursor) + for cursor in range(len(atom_collection_mock_data)): + if atom_collection_mock_data[cursor]['_id'] == handle['_id']: + atom_collection_mock_data.pop(cursor) break - for cursor in range(len(added_nodes)): - if added_nodes[cursor]['_id'] == handle['_id']: - added_nodes.pop(cursor) + for cursor in range(len(added_atoms)): + if added_atoms[cursor]['_id'] == handle['_id']: + added_atoms.pop(cursor) break - added_nodes.append(document) + added_atoms.append(document) def find_one(handle: dict): - for data in node_collection_mock_data + added_nodes: + for data in atom_collection_mock_data + added_atoms: if data['_id'] == handle['_id']: return data def find(_filter: Optional[Any] = None): if _filter is None: - return node_collection_mock_data + added_nodes + return atom_collection_mock_data + added_atoms else: ret = [] - for node in node_collection_mock_data + added_nodes: + for atom in atom_collection_mock_data + added_atoms: if MongoFieldNames.TYPE_NAME in _filter: - if _filter[MongoFieldNames.TYPE_NAME] == node[MongoFieldNames.TYPE_NAME]: - ret.append(node) + if _filter[MongoFieldNames.TYPE_NAME] == atom[MongoFieldNames.TYPE_NAME]: + ret.append(atom) else: if ( - _filter[MongoFieldNames.TYPE] == node[MongoFieldNames.TYPE] + _filter[MongoFieldNames.TYPE] == atom[MongoFieldNames.TYPE] and _filter[MongoFieldNames.NODE_NAME]['$regex'] - in node[MongoFieldNames.NODE_NAME] + in atom[MongoFieldNames.NODE_NAME] ): - ret.append(node) + ret.append(atom) return ret def estimated_document_count(): - return len(node_collection_mock_data) + len(added_nodes) + return len(atom_collection_mock_data) + len(added_atoms) + + def count_documents(filter: Dict[str, Any]) -> int: + atoms = atom_collection_mock_data + added_atoms + counter = 0 + for atom in atoms: + if list(filter.keys())[0] not in atom: + counter += 1 + return counter collection.insert_many = mock.Mock(side_effect=insert_many) collection.replace_one = mock.Mock(side_effect=replace_one) collection.find_one = mock.Mock(side_effect=find_one) collection.find = mock.Mock(side_effect=find) collection.estimated_document_count = mock.Mock(side_effect=estimated_document_count) + collection.count_documents = mock.Mock(side_effect=count_documents) return collection @pytest.fixture() @@ -1443,104 +1448,12 @@ def find(_filter: Optional[Any] = None): collection.find = mock.Mock(side_effect=find) return collection - @pytest.fixture() - def mongo_arity_1_collection(self, mongo_db): - collection = mock.MagicMock( - spec=Collection, - database=mongo_db, - name=MongoCollectionNames.LINKS_ARITY_1, - ) - - def find(_filter: Optional[Any] = None): - if _filter is None: - return [] - return [] - - def estimated_document_count(): - return len([]) - - collection.find = mock.Mock(side_effect=find) - collection.estimated_document_count = mock.Mock(side_effect=estimated_document_count) - return collection - - @pytest.fixture() - def mongo_arity_2_collection(self, mongo_db): - collection = mock.MagicMock( - spec=Collection, - database=mongo_db, - name=MongoCollectionNames.LINKS_ARITY_2, - ) - - def find_one(_filter: dict): - for data in arity_2_collection_mock_data: - if data['_id'] == _filter['_id']: - return data - - def find(_filter: Optional[Any] = None): - if _filter is None: - return arity_2_collection_mock_data - else: - ret = [] - for link in arity_2_collection_mock_data + added_links_arity_2: - if MongoFieldNames.TYPE_NAME in _filter: - if _filter[MongoFieldNames.TYPE_NAME] == link[MongoFieldNames.TYPE_NAME]: - ret.append(link) - return ret - - def insert_many(documents: List[Dict[str, Any]], ordered: bool): - added_links_arity_2.extend(documents) - - def replace_one(handle: Dict[str, Any], document: Dict[str, Any], upsert: bool): - for cursor in range(len(arity_2_collection_mock_data)): - if arity_2_collection_mock_data[cursor]['_id'] == handle['_id']: - arity_2_collection_mock_data.pop(cursor) - break - for cursor in range(len(added_links_arity_2)): - if added_links_arity_2[cursor]['_id'] == handle['_id']: - added_links_arity_2.pop(cursor) - break - added_links_arity_2.append(document) - - def estimated_document_count(): - return len(arity_2_collection_mock_data) + len(added_links_arity_2) - - collection.insert_many = mock.Mock(side_effect=insert_many) - collection.replace_one = mock.Mock(side_effect=replace_one) - collection.find_one = mock.Mock(side_effect=find_one) - collection.find = mock.Mock(side_effect=find) - collection.estimated_document_count = mock.Mock(side_effect=estimated_document_count) - - return collection - - @pytest.fixture() - def mongo_arity_n_collection(self, mongo_db): - collection = mock.MagicMock( - spec=Collection, - database=mongo_db, - name=MongoCollectionNames.LINKS_ARITY_N, - ) - - def find(_filter: Optional[Any] = None): - if _filter is None: - return [] - return [] - - def estimated_document_count(): - return len([]) - - collection.find = mock.Mock(side_effect=find) - collection.estimated_document_count = mock.Mock(side_effect=estimated_document_count) - return collection - @pytest.fixture() def database( self, mongo_db, redis_db, - mongo_arity_1_collection, - mongo_arity_2_collection, - mongo_arity_n_collection, - mongo_nodes_collection, + mongo_atoms_collection, mongo_types_collection, ): with mock.patch( @@ -1551,34 +1464,14 @@ def database( return_value=redis_db, ): db = RedisMongoDB() - db.mongo_link_collection = { - '1': mongo_arity_1_collection, - '2': mongo_arity_2_collection, - 'N': mongo_arity_2_collection, # now we use only 'N' collection - } - db.mongo_nodes_collection = mongo_nodes_collection + db.mongo_atoms_collection = mongo_atoms_collection db.mongo_types_collection = mongo_types_collection db.all_mongo_collections = [ - ( - MongoCollectionNames.LINKS_ARITY_1, - db.mongo_link_collection['1'], - ), - ( - MongoCollectionNames.LINKS_ARITY_2, - db.mongo_link_collection['2'], - ), - ( - MongoCollectionNames.LINKS_ARITY_N, - db.mongo_link_collection['N'], - ), - (MongoCollectionNames.NODES, db.mongo_nodes_collection), + (MongoCollectionNames.ATOMS, db.mongo_atoms_collection), (MongoCollectionNames.ATOM_TYPES, db.mongo_types_collection), ] db.mongo_bulk_insertion_buffer = { - MongoCollectionNames.LINKS_ARITY_1: tuple([db.mongo_link_collection['1'], set()]), - MongoCollectionNames.LINKS_ARITY_2: tuple([db.mongo_link_collection['2'], set()]), - MongoCollectionNames.LINKS_ARITY_N: tuple([db.mongo_link_collection['N'], set()]), - MongoCollectionNames.NODES: tuple([db.mongo_nodes_collection, set()]), + MongoCollectionNames.ATOMS: tuple([db.mongo_atoms_collection, set()]), MongoCollectionNames.ATOM_TYPES: tuple([db.mongo_types_collection, set()]), } return db @@ -1850,15 +1743,12 @@ def test_get_link_type_without_cache(self, database): def test_atom_count(self, database): node_count, link_count = database.count_atoms() - link_count = ( - link_count // 2 - ) # because we're counting links_N and links_2 but they are the same assert node_count == 14 assert link_count == 28 def test_add_node(self, database): - added_nodes.clear() - assert (14, 56) == database.count_atoms() + added_atoms.clear() + assert (14, 28) == database.count_atoms() all_nodes_before = database.get_all_nodes('Concept') database.add_node( { @@ -1870,7 +1760,7 @@ def test_add_node(self, database): all_nodes_after = database.get_all_nodes('Concept') assert len(all_nodes_before) == 14 assert len(all_nodes_after) == 15 - assert (15, 56) == database.count_atoms() + assert (15, 28) == database.count_atoms() new_node_handle = database.get_node_handle('Concept', 'lion') assert new_node_handle == ExpressionHasher.terminal_hash('Concept', 'lion') assert new_node_handle not in all_nodes_before @@ -1879,12 +1769,13 @@ def test_add_node(self, database): assert new_node['handle'] == new_node_handle assert new_node['named_type'] == 'Concept' assert new_node['name'] == 'lion' - added_nodes.clear() + added_atoms.clear() + database.count_atoms() def test_add_link(self, database): - added_nodes.clear() - added_links_arity_2.clear() - assert (14, 56) == database.count_atoms() + added_atoms.clear() + + assert (14, 28) == database.count_atoms() all_nodes_before = database.get_all_nodes('Concept') all_links_before = ( @@ -1911,9 +1802,9 @@ def test_add_link(self, database): assert len(all_nodes_before) == 14 assert len(all_nodes_after) == 16 - assert len(all_links_before) == 56 - assert len(all_links_after) == 58 - assert (16, 58) == database.count_atoms() + assert len(all_links_before) == 28 + assert len(all_links_after) == 29 + assert (16, 29) == database.count_atoms() new_node_handle = database.get_node_handle('Concept', 'lion') assert new_node_handle == ExpressionHasher.terminal_hash('Concept', 'lion') @@ -1933,8 +1824,7 @@ def test_add_link(self, database): assert new_node['named_type'] == 'Concept' assert new_node['name'] == 'cat' - added_nodes.clear() - added_links_arity_2.clear() + added_atoms.clear() def test_add_node_and_link_with_reserved_parameters(self, database): with pytest.raises(AddNodeException) as exc: @@ -1996,9 +1886,8 @@ def test_get_atom_type(self, database): assert 'Inheritance' == database.get_atom_type(i) def test_create_field_index_node_collection(self, database): - database.mongo_nodes_collection = mock.Mock() - database.mongo_link_collection = {} - database.mongo_nodes_collection.create_index.return_value = 'name_index_asc' + database.mongo_atoms_collection = mock.Mock() + database.mongo_atoms_collection.create_index.return_value = 'name_index_asc' with mock.patch( 'hyperon_das_atomdb.index.Index.generate_index_id', return_value='name_index_asc', @@ -2006,16 +1895,15 @@ def test_create_field_index_node_collection(self, database): result = database.create_field_index('node', 'name', 'Type') assert result == 'name_index_asc' - database.mongo_nodes_collection.create_index.assert_called_once_with( + database.mongo_atoms_collection.create_index.assert_called_once_with( [('name', 1)], name='name_index_asc', partialFilterExpression={'named_type': {'$eq': 'Type'}}, ) def test_create_field_index_link_collection(self, database): - database.mongo_nodes_collection = mock.Mock() - database.mongo_link_collection = {'link1': mock.Mock()} - database.mongo_link_collection['link1'].create_index.return_value = 'link_index_asc' + database.mongo_atoms_collection = mock.Mock() + database.mongo_atoms_collection.create_index.return_value = 'link_index_asc' with mock.patch( 'hyperon_das_atomdb.index.Index.generate_index_id', return_value='link_index_asc', @@ -2023,19 +1911,20 @@ def test_create_field_index_link_collection(self, database): result = database.create_field_index('link', 'field', 'Type') assert result == 'link_index_asc' - database.mongo_link_collection['link1'].create_index.assert_called_once_with( + database.mongo_atoms_collection.create_index.assert_called_once_with( [('field', 1)], name='link_index_asc', partialFilterExpression={'named_type': {'$eq': 'Type'}}, ) + @pytest.mark.skip(reason="Maybe change the way to handle this test") def test_create_field_index_invalid_collection(self, database): with pytest.raises(ValueError): database.create_field_index('invalid_atom_type', 'field', 'type') def test_create_field_index_operation_failure(self, database): - database.mongo_nodes_collection = mock.Mock() - database.mongo_nodes_collection.create_index.side_effect = OperationFailure( + database.mongo_atoms_collection = mock.Mock() + database.mongo_atoms_collection.create_index.side_effect = OperationFailure( 'Index creation failed' ) result = database.create_field_index('node', 'field', 'Type') @@ -2043,10 +1932,9 @@ def test_create_field_index_operation_failure(self, database): assert result == 'Index creation failed, Details: Index creation failed' def test_create_field_index_already_exists(self, database): - database.mongo_nodes_collection = mock.Mock() - database.mongo_link_collection = {} - database.mongo_nodes_collection.list_indexes.return_value = [] - database.mongo_nodes_collection.create_index.return_value = 'name_index_asc' + database.mongo_atoms_collection = mock.Mock() + database.mongo_atoms_collection.list_indexes.return_value = [] + database.mongo_atoms_collection.create_index.return_value = 'name_index_asc' with mock.patch( 'hyperon_das_atomdb.index.Index.generate_index_id', return_value='name_index_asc', From 471f1d5c707f6392e1b92344d1daaa77ebc6c023 Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Wed, 20 Mar 2024 19:01:51 -0300 Subject: [PATCH 2/3] change type hint --- hyperon_das_atomdb/adapters/redis_mongo_db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hyperon_das_atomdb/adapters/redis_mongo_db.py b/hyperon_das_atomdb/adapters/redis_mongo_db.py index 756aaf05..80c63e74 100644 --- a/hyperon_das_atomdb/adapters/redis_mongo_db.py +++ b/hyperon_das_atomdb/adapters/redis_mongo_db.py @@ -691,7 +691,7 @@ def _update_atom_indexes(self, documents: Iterable[Dict[str, any]], **kwargs) -> else: self._update_node_index(document, **kwargs) - def _update_node_index(self, document: Iterable[Dict[str, Any]], **kwargs) -> None: + def _update_node_index(self, document: Dict[str, Any], **kwargs) -> None: handle = document[MongoFieldNames.ID_HASH] node_name = document[MongoFieldNames.NODE_NAME] key = _build_redis_key(KeyPrefix.NAMED_ENTITIES, handle) From 0c473fb9e55ee92133565a99b69194cb11d6a9fc Mon Sep 17 00:00:00 2001 From: marcocapozzoli Date: Thu, 21 Mar 2024 09:47:18 -0300 Subject: [PATCH 3/3] remove unused variable --- hyperon_das_atomdb/adapters/redis_mongo_db.py | 1 - 1 file changed, 1 deletion(-) diff --git a/hyperon_das_atomdb/adapters/redis_mongo_db.py b/hyperon_das_atomdb/adapters/redis_mongo_db.py index 80c63e74..20097646 100644 --- a/hyperon_das_atomdb/adapters/redis_mongo_db.py +++ b/hyperon_das_atomdb/adapters/redis_mongo_db.py @@ -126,7 +126,6 @@ def __init__(self, **kwargs: Optional[Dict[str, Any]]) -> None: """ self.database_name = 'das' self._setup_databases(**kwargs) - self.use_metta_mapping = kwargs.get("use_metta_mapping", True) self.mongo_atoms_collection = self.mongo_db.get_collection(MongoCollectionNames.ATOMS) self.mongo_types_collection = self.mongo_db.get_collection(MongoCollectionNames.ATOM_TYPES) self.mongo_custom_indexes_collection = self.mongo_db.get_collection(