diff --git a/CHANGELOG b/CHANGELOG index f6a5d972..010d9150 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -6,3 +6,4 @@ [#124] Changed count_atoms() to return more accurate numbers [das-query-engine#197] Changed get_all_links() to return a tuple [#142] Changed add_link() and add_node() to work with get_atom returns +[das-query-engine#114] Changed commit() to receive buffer as a kwargs parameter diff --git a/hyperon_das_atomdb/adapters/ram_only.py b/hyperon_das_atomdb/adapters/ram_only.py index 34fad267..95073d61 100644 --- a/hyperon_das_atomdb/adapters/ram_only.py +++ b/hyperon_das_atomdb/adapters/ram_only.py @@ -535,3 +535,6 @@ def retrieve_all_atoms(self) -> List[Dict[str, Any]]: except Exception as e: logger().error(f"Error retrieving all atoms: {str(e)}") raise e + + def commit(self, **kwargs) -> None: + raise NotImplementedError() diff --git a/hyperon_das_atomdb/adapters/redis_mongo_db.py b/hyperon_das_atomdb/adapters/redis_mongo_db.py index dd934abf..6e200f36 100644 --- a/hyperon_das_atomdb/adapters/redis_mongo_db.py +++ b/hyperon_das_atomdb/adapters/redis_mongo_db.py @@ -558,22 +558,32 @@ def clear_database(self) -> None: self.redis.flushall() - def commit(self) -> None: + def commit(self, **kwargs) -> None: id_tag = MongoFieldNames.ID_HASH - for key, (collection, buffer) in self.mongo_bulk_insertion_buffer.items(): - if buffer: - if key == MongoCollectionNames.ATOM_TYPES: - logger().error('Failed to commit Atom Types. This operation is not allowed') - raise InvalidOperationException - documents = [d.base for d in buffer] - - for document in documents: - collection.replace_one({id_tag: document[id_tag]}, document, upsert=True) - - self._update_atom_indexes(documents) - - buffer.clear() + if kwargs.get('buffer'): + try: + for document in kwargs['buffer']: + self.mongo_atoms_collection.replace_one( + {id_tag: document[id_tag]}, document, upsert=True + ) + self._update_atom_indexes([document]) + except Exception as e: + logger().error(f'Failed to commit buffer - Details: {str(e)}') + raise e + else: + for key, (collection, buffer) in self.mongo_bulk_insertion_buffer.items(): + if buffer: + if key == MongoCollectionNames.ATOM_TYPES: + logger().error('Failed to commit Atom Types. This operation is not allowed') + raise InvalidOperationException + + for hashtable in buffer: + document = hashtable.base + collection.replace_one({id_tag: document[id_tag]}, document, upsert=True) + self._update_atom_indexes([document]) + + buffer.clear() def add_node(self, node_params: Dict[str, Any]) -> Dict[str, Any]: handle, node = self._add_node(node_params) diff --git a/tests/integration/adapters/test_redis_mongo.py b/tests/integration/adapters/test_redis_mongo.py index 8b5ba1b4..af489665 100644 --- a/tests/integration/adapters/test_redis_mongo.py +++ b/tests/integration/adapters/test_redis_mongo.py @@ -994,3 +994,45 @@ def test_add_fields_to_atoms(self, _cleanup): assert db.get_atom(link_handle)['score'] == 0.5 _db_down() + + def test_commit_with_buffer(self, _cleanup): + _db_up(Database.REDIS, Database.MONGO) + db = self._connect_db() + assert db.count_atoms() == (0, 0) + buffer = [ + { + '_id': '26d35e45817f4270f2b7cff971b04138', + 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'name': 'dog', + 'named_type': 'Concept', + }, + { + '_id': 'b7db6a9ed2191eb77ee54479570db9a4', + 'composite_type_hash': 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'name': 'cat', + 'named_type': 'Concept', + }, + { + '_id': '3dab102938606f4549d68405ec9f4f61', + 'composite_type_hash': 'ed73ea081d170e1d89fc950820ce1cee', + 'is_toplevel': True, + 'composite_type': [ + 'a9dea78180588431ec64d6bc4872fdbc', + 'd99a604c79ce3c2e76a2f43488d5d4c3', + 'd99a604c79ce3c2e76a2f43488d5d4c3', + ], + 'named_type': 'Similarity', + 'named_type_hash': 'a9dea78180588431ec64d6bc4872fdbc', + 'key_0': '26d35e45817f4270f2b7cff971b04138', + 'key_1': 'b7db6a9ed2191eb77ee54479570db9a4', + }, + ] + db.commit(buffer=buffer) + assert db.count_atoms() == (2, 1) + assert db.get_atom('26d35e45817f4270f2b7cff971b04138')['name'] == 'dog' + assert db.get_atom('b7db6a9ed2191eb77ee54479570db9a4')['name'] == 'cat' + assert db.get_atom('3dab102938606f4549d68405ec9f4f61')['targets'] == [ + '26d35e45817f4270f2b7cff971b04138', + 'b7db6a9ed2191eb77ee54479570db9a4', + ] + _db_down()