diff --git a/hyperon_das_atomdb/adapters/__init__.py b/hyperon_das_atomdb/adapters/__init__.py index 33b09468..499f2db3 100644 --- a/hyperon_das_atomdb/adapters/__init__.py +++ b/hyperon_das_atomdb/adapters/__init__.py @@ -1,5 +1,4 @@ from .ram_only import InMemoryDB from .redis_mongo_db import RedisMongoDB -from .redis_postgreslobe_db import RedisPostgresLobeDB -__all__ = ['RedisMongoDB', 'InMemoryDB', 'RedisPostgresLobeDB'] +__all__ = ['RedisMongoDB', 'InMemoryDB'] diff --git a/hyperon_das_atomdb/adapters/redis_postgreslobe_db.py b/hyperon_das_atomdb/adapters/redis_postgreslobe_db.py deleted file mode 100644 index e2df61eb..00000000 --- a/hyperon_das_atomdb/adapters/redis_postgreslobe_db.py +++ /dev/null @@ -1,328 +0,0 @@ -import json -from enum import Enum -from typing import Any, Dict, List, Optional, Tuple, Union - -import psycopg2 -from psycopg2.extensions import cursor as PostgresCursor - -from hyperon_das_atomdb.adapters.redis_mongo_db import RedisMongoDB -from hyperon_das_atomdb.exceptions import InvalidSQL -from hyperon_das_atomdb.logger import logger -from hyperon_das_atomdb.utils.expression_hasher import ExpressionHasher -from hyperon_das_atomdb.utils.mapper import Table, create_mapper - - -class FieldNames(str, Enum): - NODE_NAME = 'name' - TYPE_NAME = 'named_type' - TYPE_NAME_HASH = 'named_type_hash' - ID_HASH = '_id' - TYPE = 'composite_type_hash' - COMPOSITE_TYPE = 'composite_type' - KEY_PREFIX = 'key' - KEYS = 'keys' - - -class RedisPostgresLobeDB(RedisMongoDB): - """A concrete implementation using Redis and a PostgresLobe""" - - def __repr__(self) -> str: - return "" # pragma no cover - - def __init__(self, **kwargs) -> None: - self.database_name = 'das' - self.pattern_index_templates = None - self.table_names = [] - self.named_type_hash = {} - self.typedef_base_type_hash = ExpressionHasher._compute_hash("Type") - self.hash_length = len(self.typedef_base_type_hash) - self.mapper = create_mapper(kwargs.get('mapper', 'sql2metta')) - self._setup_databases(**kwargs) - self._setup_indexes() - self._fetch() - logger().info("Database setup finished") - - def _setup_databases( - self, - postgres_database_name='postgres', - postgres_hostname='localhost', - postgres_port=27017, - postgres_username='postgres', - postgres_password='postgres', - redis_hostname='localhost', - redis_port=6379, - redis_username=None, - redis_password=None, - redis_cluster=True, - redis_ssl=True, - **kwargs, - ) -> None: - self.postgres_db = self._connection_postgres_db( - postgres_database_name, - postgres_hostname, - postgres_port, - postgres_username, - postgres_password, - ) - self.redis = self._connection_redis( - redis_hostname, - redis_port, - redis_username, - redis_password, - redis_cluster, - redis_ssl, - ) - - def _setup_indexes(self): - self.default_pattern_index_templates = [] - for named_type in [True, False]: - for pos0 in [True, False]: - for pos1 in [True, False]: - for pos2 in [True, False]: - if named_type and pos0 and pos1 and pos2: - continue - template = {} - template[FieldNames.TYPE_NAME] = named_type - template["selected_positions"] = [ - i for i, pos in enumerate([pos0, pos1, pos2]) if pos - ] - self.default_pattern_index_templates.append(template) - - def _connection_postgres_db( - self, - postgres_database_name='postgres', - postgres_hostname='localhost', - postgres_port=5432, - postgres_username='postgres', - postgres_password='postgres', - ) -> None: - logger().info( - f"Connecting to Postgres at {postgres_username}:{postgres_password}://{postgres_hostname}:{postgres_port}/{postgres_database_name}" - ) - try: - return psycopg2.connect( - database=postgres_database_name, - host=postgres_hostname, - port=postgres_port, - user=postgres_username, - password=postgres_password, - ) - except psycopg2.Error as e: - logger().error(f'An error occourred when connection to Postgres - Details: {str(e)}') - raise e - - def _fetch(self) -> None: - try: - with self.postgres_db.cursor() as cursor: - cursor.execute( - "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';" - ) - self.table_names = [table[0] for table in cursor.fetchall()] - for table_name in self.table_names: - table = self._parser(cursor, table_name) - atoms = self.mapper.map_table(table) - self._update_atom_indexes(atoms) - self._insert_atoms(atoms) - except (psycopg2.Error, Exception) as e: - logger().error(f"Error during fetching data from Postgres Lobe - Details: {str(e)}") - raise e - - def _parser(self, cursor: PostgresCursor, table_name: str) -> Table: - table = Table(table_name) - - try: - # Get information about the constrainst and data type - cursor.execute( - f""" - SELECT - cols.column_name, - cols.data_type, - CASE - WHEN cons.constraint_type = 'PRIMARY KEY' THEN 'PK' - WHEN cons.constraint_type = 'FOREIGN KEY' THEN 'FK' - ELSE '' - END AS type - FROM - information_schema.columns cols - LEFT JOIN - ( - SELECT - kcu.column_name, - tc.constraint_type - FROM - information_schema.key_column_usage kcu - JOIN - information_schema.table_constraints tc - ON kcu.constraint_name = tc.constraint_name - AND kcu.constraint_schema = tc.constraint_schema - WHERE - tc.table_name = '{table_name}' - ) cons - ON cols.column_name = cons.column_name - WHERE - cols.table_name = '{table_name}' - ORDER BY - CASE - WHEN cons.constraint_type = 'PRIMARY KEY' THEN 0 - ELSE 1 - END; - """ - ) - columns = cursor.fetchall() - for column in columns: - table.add_column(*column) - - # Pk column - cursor.execute( - f""" - SELECT column_name - FROM information_schema.key_column_usage - WHERE constraint_name = ( - SELECT constraint_name - FROM information_schema.table_constraints - WHERE table_name = '{table_name}' AND constraint_type = 'PRIMARY KEY' - ) AND table_name = '{table_name}'; - """ - ) - pk_column = cursor.fetchone()[0] - - # Pk Data - cursor.execute( - f""" - SELECT {pk_column} FROM {table_name}; - """ - ) - pk_table = cursor.fetchall() - - # Non PK columns - cursor.execute( - f""" - SELECT - string_agg(column_name, ',' ORDER BY ordinal_position) - FROM ( - SELECT column_name, ordinal_position - FROM information_schema.columns - WHERE table_name = '{table_name}' AND column_name != '{pk_column}' - ) AS subquery; - """ - ) - non_pk_column = cursor.fetchone()[0] - - # Non PK Data - cursor.execute( - f""" - SELECT {non_pk_column} FROM {table_name} - """ - ) - non_pk_table = cursor.fetchall() - - # Sorted data where PK column is first - rows = [(*k, *v) for k, v in zip(pk_table, non_pk_table)] - - for row in rows: - table.add_row({key: value for key, value in zip(table.get_column_names(), row)}) - - return table - except (psycopg2.Error, TypeError, Exception) as e: - logger().error(f"Error: {e}") - raise InvalidSQL(message=f"Error during parsing table '{table_name}'", details=str(e)) - - def _insert_atoms(self, atoms: Dict[str, Any]) -> None: - for atom in atoms: - key = f'atoms:{atom["_id"]}' - self.redis.set(key, json.dumps(atom)) - - def _retrieve_document(self, handle: str) -> dict: - key = f'atoms:{handle}' - answer = self.redis.get(key) - if answer is not None: - document = json.loads(answer) - if self._is_document_link(document): - document["targets"] = self._get_document_keys(document) - return document - return None - - def _retrieve_all_documents(self, key: str = None, value: Any = None): - answers = self.redis.mget(self.redis.keys('atoms:*')) - all_documents = [json.loads(answer) for answer in answers] - if key and value is not None: - if value is True: - return [document for document in all_documents if key in document] - elif value is False: - return [document for document in all_documents if key not in document] - else: - return [document for document in all_documents if document[key] == value] - else: - return all_documents - - def _get_and_delete_links_by_handles(self, handles: List[str]) -> Dict[str, Any]: - pass - - def _retrieve_documents_by_index( - self, collection: Any, index_id: str, **kwargs - ) -> Tuple[int, List[Dict[str, Any]]]: - pass - - def get_all_nodes(self, node_type: str, names: bool = False) -> List[str]: - if names: - return [ - document[FieldNames.NODE_NAME] - for document in self._retrieve_all_documents( - key=FieldNames.TYPE_NAME, value=node_type - ) - ] - else: - return [ - document[FieldNames.ID_HASH] - for document in self._retrieve_all_documents( - key=FieldNames.TYPE_NAME, value=node_type - ) - ] - - def get_all_links(self, link_type: str) -> List[str]: - links_handle = [] - documents = self._retrieve_all_documents(key=FieldNames.TYPE_NAME, value=link_type) - for document in documents: - links_handle.append(document[FieldNames.ID_HASH]) - return links_handle - - def count_atoms(self) -> Tuple[int, int]: - atoms = len(self._retrieve_all_documents()) - nodes = len(self._retrieve_all_documents(FieldNames.COMPOSITE_TYPE, False)) - return nodes, atoms - nodes - - def get_matched_node_name(self, node_type: str, substring: str) -> str: - raise NotImplementedError("The method 'get_matched_node_name' is not implemented yet") - - def commit(self) -> None: - raise NotImplementedError("The method 'commit' is not implemented yet") - - def add_node(self, node_params: Dict[str, Any]) -> Dict[str, Any]: - raise NotImplementedError("The method 'add_node' is not implemented yet") - - def add_link(self, link_params: Dict[str, Any], toplevel: bool = True) -> Dict[str, Any]: - raise NotImplementedError("The method 'add_link' is not implemented yet") - - def reindex(self, pattern_index_templates: Optional[Dict[str, Dict[str, Any]]] = None): - raise NotImplementedError("The method 'reindex' is not implemented yet") - - def delete_atom(self, handle: str, **kwargs) -> None: - raise NotImplementedError("The method 'delete_atom' is not implemented yet") - - def create_field_index( - self, - atom_type: str, - field: str, - type: Optional[str] = None, - composite_type: Optional[List[Any]] = None, - ) -> str: - raise NotImplementedError("The method 'create_field_index' is not implemented yet") - - def get_atoms_by_index(self, index_id: str, **kwargs) -> Union[Tuple[int, list], list]: - raise NotImplementedError("The method 'get_atoms_by_index' is not implemented yet") - - def bulk_insert(self, documents: List[Dict[str, Any]]) -> None: - raise NotImplementedError("The method 'bulk_insert' is not implemented yet") - - def clear_database(self) -> None: - raise NotImplementedError("The method 'clear_database' is not implemented yet") diff --git a/hyperon_das_atomdb/utils/mapper.py b/hyperon_das_atomdb/utils/mapper.py deleted file mode 100644 index 0d97dc10..00000000 --- a/hyperon_das_atomdb/utils/mapper.py +++ /dev/null @@ -1,175 +0,0 @@ -from abc import ABC, abstractmethod -from dataclasses import dataclass, field -from typing import Any, Dict, List - -from hyperon_das_atomdb.utils.expression_hasher import ExpressionHasher - - -@dataclass -class Table: - table_name: str = "" - columns: List[Any] = field(default_factory=list) - rows: List[Any] = field(default_factory=list) - - def add_column(self, name: str, data_type: str, constraint_type: str = '') -> None: - self.columns.append({"name": name, "type": data_type, "constraint_type": constraint_type}) - - def add_row(self, row_data: Dict[str, Any]) -> None: - if len(row_data) != len(self.columns): - raise ValueError("error") # Improve the message - self.rows.append(row_data) - - def get_column_names(self) -> list: - return [column['name'] for column in self.columns] - - def to_dict(self) -> dict: - return {self.table_name: self.rows} - - -class SQLMapper(ABC): - @abstractmethod - def map_table(self, table: Table) -> List[Dict[str, Any]]: - ... # pragma: no cover - - -class SQL2AtomeseMapper(SQLMapper): - def map_table(self, table: Table) -> List[Dict[str, Any]]: - return self._to_atoms_type_atomese(table) - - def _to_atoms_type_atomese(self, table: Table) -> List[Dict[str, Any]]: - """WIP""" - ... # pragma no cover - - -class SQL2MettaMapper(SQLMapper): - def map_table(self, table: Table) -> List[Dict[str, Any]]: - return self._to_atoms_type_metta(table) - - def _to_atoms_type_metta(self, table: Table) -> List[Dict[str, Any]]: - atoms = [self._create_node(name=table.table_name, is_literal=False)] - - for row in table.rows: - _, pk_value = list(row.items())[0] - key_0 = {'type': 'Symbol', 'name': table.table_name, 'is_literal': False} - key_1 = {'type': 'Symbol', 'name': pk_value, 'is_literal': True} - pk_link = {'type': 'Expression', 'targets': [key_0, key_1]} - - atoms.append(self._create_node(**key_1)) - - for key, value in list(row.items())[1:]: - key_0 = {'type': 'Symbol', 'name': f'{table.table_name}.{key}', 'is_literal': False} - key_1 = pk_link - key_2 = {'type': 'Symbol', 'name': value, 'is_literal': True} - answers = self._create_link(targets=[key_0, key_1, key_2]) - for answer in answers: - if answer not in atoms: - atoms.append(answer) - - return atoms - - def _is_literal(self, name: str) -> bool: - if ( - name - in [ - 'Type', - 'MettaType', - 'Concept', - 'Symbol', - 'Expression', - 'Similarity', - 'Inheritance', - ':', - ] - ) or ('.' in name): - return False - return True - - def _check_numerical_type(self, name) -> str: - try: - int(name) - return 'int' - except ValueError: - try: - float(name) - return 'float' - except ValueError: - return None - - def _create_node(self, name: str, type: str = "Symbol", **kwargs) -> Dict[str, Any]: - if (literal := kwargs.get('is_literal')) is None: - literal = True if self._is_literal(name) else False - - node = {'is_literal': literal} - - if self._check_numerical_type(name) == 'int': - node['value_as_int'] = name - elif self._check_numerical_type(name) == 'float': - node['value_as_float'] = name - - if literal: - name = f'"{name}"' - - node.update( - { - '_id': ExpressionHasher.terminal_hash(type, name), - 'composite_type_hash': ExpressionHasher.named_type_hash(type), - 'named_type': type, - 'name': name, - } - ) - - return node - - def _create_link( - self, - targets: List[Dict[str, Any]], - type: str = "Expression", - toplevel: bool = True, - **kwargs, - ) -> Dict[str, Any]: - link_type_hash = ExpressionHasher.named_type_hash(type) - - targets_hash = [] - composite_type = [link_type_hash] - composite_type_hash = [link_type_hash] - - targets_atom = [] - - for target in targets: - if 'targets' not in target.keys(): - atom = self._create_node(**target) - atom_hash = ExpressionHasher.named_type_hash(atom['named_type']) - composite_type.append(atom_hash) - else: - atom = self._create_link(**target, toplevel=False)[0] - composite_type.append(atom['composite_type']) - atom_hash = atom['composite_type_hash'] - composite_type_hash.append(atom_hash) - targets_hash.append(atom['_id']) - targets_atom.append(atom) - - link = { - '_id': ExpressionHasher.expression_hash(link_type_hash, targets_hash), - 'composite_type_hash': ExpressionHasher.composite_hash(composite_type_hash), - 'is_toplevel': toplevel, - 'composite_type': composite_type, - 'named_type': type, - 'named_type_hash': link_type_hash, - } - - for item in range(len(targets)): - link[f'key_{item}'] = targets_hash[item] - - ret = [link] - ret.extend(targets_atom) - - return ret - - -def create_mapper(mapper: str) -> SQLMapper: - if mapper == "sql2atomese": - return SQL2AtomeseMapper() - elif mapper == "sql2metta": - return SQL2MettaMapper() - else: - raise ValueError("Unknown mapper") diff --git a/pyproject.toml b/pyproject.toml index 1c4317c5..99863b7c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,6 @@ python = "^3.8.5" redis = "^5.0.0" pymongo = "^4.5.0" python-dotenv = "^1.0.0" -psycopg2-binary = "^2.9.9" [tool.poetry.group.dev.dependencies] diff --git a/tests/integration/adapters/helpers.py b/tests/integration/adapters/helpers.py index e4a74886..33475e44 100644 --- a/tests/integration/adapters/helpers.py +++ b/tests/integration/adapters/helpers.py @@ -5,7 +5,6 @@ redis_port = "15926" mongo_port = "15927" -postgres_port = "15928" scripts_path = "./tests/integration/scripts" devnull = open(os.devnull, 'w') @@ -19,11 +18,6 @@ DAS_REDIS_PASSWORD = os.environ.get("DAS_REDIS_PASSWORD") DAS_USE_REDIS_CLUSTER = os.environ.get("DAS_USE_REDIS_CLUSTER") DAS_USE_REDIS_SSL = os.environ.get("DAS_USE_REDIS_SSL") -DAS_POSTGRES_HOSTNAME = os.environ.get("DAS_POSTGRES_HOSTNAME") -DAS_POSTGRES_PORT = os.environ.get("DAS_POSTGRES_PORT") -DAS_POSTGRES_USERNAME = os.environ.get("DAS_POSTGRES_USERNAME") -DAS_POSTGRES_PASSWORD = os.environ.get("DAS_POSTGRES_PASSWORD") -DAS_POSTGRES_DATABASE = os.environ.get("DAS_POSTGRES_DATABASE") os.environ["DAS_MONGODB_HOSTNAME"] = "localhost" os.environ["DAS_MONGODB_PORT"] = mongo_port @@ -35,17 +29,11 @@ os.environ["DAS_REDIS_PASSWORD"] = "" os.environ["DAS_USE_REDIS_CLUSTER"] = "false" os.environ["DAS_USE_REDIS_SSL"] = "false" -os.environ["DAS_POSTGRES_HOSTNAME"] = "localhost" -os.environ["DAS_POSTGRES_PORT"] = postgres_port -os.environ["DAS_POSTGRES_USERNAME"] = "dbadmin" -os.environ["DAS_POSTGRES_PASSWORD"] = "dassecret" -os.environ["DAS_POSTGRES_DATABASE"] = "das" class Database(Enum): REDIS = "redis" MONGO = "mongo" - POSTGRES = "postgres" def _db_up(*database_names: List[Database]): @@ -67,11 +55,6 @@ def _db_up(*database_names: List[Database]): subprocess.call( ["bash", f"{scripts_path}/mongo-up.sh", mongo_port], stdout=devnull, stderr=devnull ) - subprocess.call( - ["bash", f"{scripts_path}/postgres-up.sh", postgres_port], - stdout=devnull, - stderr=devnull, - ) def _db_down(): @@ -81,11 +64,6 @@ def _db_down(): subprocess.call( ["bash", f"{scripts_path}/mongo-down.sh", mongo_port], stdout=devnull, stderr=devnull ) - subprocess.call( - ["bash", f"{scripts_path}/postgres-down.sh", postgres_port], - stdout=devnull, - stderr=devnull, - ) def cleanup(request): @@ -110,16 +88,6 @@ def restore_environment(): os.environ["DAS_USE_REDIS_CLUSTER"] = DAS_USE_REDIS_CLUSTER if DAS_USE_REDIS_SSL: os.environ["DAS_USE_REDIS_SSL"] = DAS_USE_REDIS_SSL - if DAS_POSTGRES_HOSTNAME: - os.environ["DAS_POSTGRES_HOSTNAME"] = DAS_POSTGRES_HOSTNAME - if DAS_POSTGRES_PORT: - os.environ["DAS_POSTGRES_PORT"] = DAS_POSTGRES_PORT - if DAS_POSTGRES_USERNAME: - os.environ["DAS_POSTGRES_USERNAME"] = DAS_POSTGRES_USERNAME - if DAS_POSTGRES_PASSWORD: - os.environ["DAS_POSTGRES_PASSWORD"] = DAS_POSTGRES_PASSWORD - if DAS_POSTGRES_DATABASE: - os.environ["DAS_POSTGRES_DATABASE"] = DAS_POSTGRES_DATABASE def enforce_containers_removal(): _db_down() diff --git a/tests/integration/adapters/test_redis_postgreslobe_db.py b/tests/integration/adapters/test_redis_postgreslobe_db.py deleted file mode 100644 index 8d78546c..00000000 --- a/tests/integration/adapters/test_redis_postgreslobe_db.py +++ /dev/null @@ -1,93 +0,0 @@ -import psycopg2 -import pytest - -from hyperon_das_atomdb.adapters import RedisPostgresLobeDB -from hyperon_das_atomdb.database import WILDCARD - -from .helpers import Database, _db_down, _db_up, cleanup, postgres_port, redis_port - - -class TestRedisPostgresLobeDB: - @pytest.fixture(scope="session", autouse=True) - def _cleanup(self, request): - return cleanup(request) - - def _populate_db(self): - conn = psycopg2.connect( - host='localhost', - database='postgres', - port=postgres_port, - user='dbadmin', - password='dassecret', - ) - with conn.cursor() as cursor: - cursor.execute( - """ - CREATE TABLE IF NOT EXISTS users ( - ID SERIAL PRIMARY KEY, - name VARCHAR(100), - age INTEGER - ); - """ - ) - cursor.execute("INSERT INTO users (name, age) VALUES ('Adam', 30);") - cursor.execute("INSERT INTO users (name, age) VALUES ('Eve', 31);") - conn.commit() - conn.close() - - def test_initialization(self, _cleanup): - _db_up(Database.REDIS, Database.POSTGRES) - - self._populate_db() - - db = RedisPostgresLobeDB( - postgres_port=postgres_port, - postgres_username='dbadmin', - postgres_password='dassecret', - redis_port=redis_port, - redis_cluster=False, - redis_ssl=False, - ) - - assert db.count_atoms() == (9, 6) - - node_users = db.node_handle('Symbol', 'users') - node_users_name = db.node_handle('Symbol', 'users.name') - node_users_age = db.node_handle('Symbol', 'users.age') - node_1 = db.node_handle('Symbol', '"1"') - node_2 = db.node_handle('Symbol', '"2"') - node_Adam = db.node_handle('Symbol', '"Adam"') - node_30 = db.node_handle('Symbol', '"30"') - node_Eve = db.node_handle('Symbol', '"Eve"') - node_31 = db.node_handle('Symbol', '"31"') - - link1 = db.link_handle('Expression', [node_users, node_1]) - link2 = db.link_handle('Expression', [node_users, node_2]) - - matched_links = db.get_matched_links('Expression', [node_users_name, WILDCARD, WILDCARD]) - assert sorted([matched_link[3] for matched_link in matched_links]) == sorted( - [node_Adam, node_Eve] - ) - - matched_links = db.get_matched_links('Expression', [WILDCARD, link1, WILDCARD]) - assert sorted([matched_link[3] for matched_link in matched_links]) == sorted( - [node_Adam, node_30] - ) - - matched_links = db.get_matched_links('Expression', [WILDCARD, WILDCARD, node_30]) - assert matched_links[0][1:] == [node_users_age, link1, node_30] - - matched_links = db.get_matched_links('Expression', [node_users_age, WILDCARD, WILDCARD]) - assert sorted([matched_link[3] for matched_link in matched_links]) == sorted( - [node_30, node_31] - ) - - matched_links = db.get_matched_links('Expression', [WILDCARD, link2, WILDCARD]) - assert sorted([matched_link[3] for matched_link in matched_links]) == sorted( - [node_Eve, node_31] - ) - - matched_links = db.get_matched_links('Expression', [WILDCARD, WILDCARD, node_31]) - assert matched_links[0][1:] == [node_users_age, link2, node_31] - - _db_down() diff --git a/tests/integration/scripts/postgres-down.sh b/tests/integration/scripts/postgres-down.sh deleted file mode 100644 index 5d407dc9..00000000 --- a/tests/integration/scripts/postgres-down.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/bin/bash - -if [ -z "$1" ] -then - echo "Usage: postgres-down.sh PORT" - exit 1 -else - PORT=$1 -fi - -echo "Destroying Postgres container on port $PORT" - -docker stop postgres_$PORT && docker rm postgres_$PORT && docker volume rm postgresdbdata_$PORT >& /dev/null diff --git a/tests/integration/scripts/postgres-up.sh b/tests/integration/scripts/postgres-up.sh deleted file mode 100644 index 514146e3..00000000 --- a/tests/integration/scripts/postgres-up.sh +++ /dev/null @@ -1,32 +0,0 @@ -#!/bin/bash - -if [ -z "$1" ] -then - echo "Usage: postgres-up.sh PORT" - exit 1 -else - PORT=$1 -fi - -echo "Starting Postgres on port $PORT" - -docker stop postgres_$PORT >& /dev/null -docker rm postgres_$PORT >& /dev/null -docker volume rm postgresdbdata_$PORT >& /dev/null - -sleep 1 -docker run \ - --detach \ - --name postgres_$PORT \ - --env POSTGRES_USER="dbadmin" \ - --env POSTGRES_PASSWORD="dassecret" \ - --env POSTGRES_DB="postgres" \ - --env TZ=${TZ} \ - --network="host" \ - --volume /tmp:/tmp \ - --volume /mnt:/mnt \ - --volume postgresdbdata_$PORT:/data/db \ - -p 5432:$PORT \ - postgres:latest \ - postgres -p $PORT -sleep 5 \ No newline at end of file diff --git a/tests/unit/adapters/test_redis_postgreslobe_db.py b/tests/unit/adapters/test_redis_postgreslobe_db.py deleted file mode 100644 index ad769a10..00000000 --- a/tests/unit/adapters/test_redis_postgreslobe_db.py +++ /dev/null @@ -1,194 +0,0 @@ -import json -from unittest import mock - -import pytest - -from hyperon_das_atomdb.adapters.redis_postgreslobe_db import RedisPostgresLobeDB - - -class TestRedisPostgresLobeDB: - @pytest.fixture() - def sql_lobe(self): - redis = mock.MagicMock() - postgres = mock.MagicMock() - with mock.patch( - 'hyperon_das_atomdb.adapters.redis_postgreslobe_db.RedisPostgresLobeDB._connection_postgres_db', - return_value=postgres, - ), mock.patch( - 'hyperon_das_atomdb.adapters.redis_postgreslobe_db.RedisPostgresLobeDB._connection_redis', - return_value=redis, - ): - yield RedisPostgresLobeDB() - - def test_repr(self, sql_lobe): - assert repr(sql_lobe) == "" - - def test_setup_databases(self, sql_lobe): - sql_lobe._setup_databases( - postgres_database_name='sql_lobe', - postgres_hostname='test', - postgres_port=5432, - postgres_username='test', - postgres_password='test', - redis_hostname='test', - redis_port=6379, - redis_username='test', - redis_password='test', - redis_cluster=False, - redis_ssl=False, - ) - sql_lobe._connection_postgres_db.assert_called_with( - 'sql_lobe', 'test', 5432, 'test', 'test' - ) - sql_lobe._connection_redis.assert_called_with('test', 6379, 'test', 'test', False, False) - - def test_fetch(self): - with mock.patch( - 'hyperon_das_atomdb.adapters.redis_postgreslobe_db.RedisPostgresLobeDB._setup_databases', - return_value=mock.MagicMock(), - ), mock.patch( - 'hyperon_das_atomdb.adapters.redis_postgreslobe_db.RedisPostgresLobeDB._setup_indexes', - return_value=mock.MagicMock(), - ), mock.patch( - 'hyperon_das_atomdb.adapters.redis_postgreslobe_db.RedisPostgresLobeDB._fetch', - return_value=mock.MagicMock(), - ): - sql_lobe = RedisPostgresLobeDB() - - sql_lobe._parser = mock.MagicMock() - sql_lobe._update_atom_indexes = mock.MagicMock() - sql_lobe._insert_atoms = mock.MagicMock() - sql_lobe.mapper.map_table = mock.MagicMock() - sql_lobe.postgres_db = mock.MagicMock() - - cursor_mock = mock.MagicMock() - cursor_mock.fetchall.return_value = [('table1',), ('table2',)] - sql_lobe.postgres_db.cursor.return_value.__enter__.return_value = cursor_mock - - sql_lobe._fetch() - - cursor_mock.execute.assert_called_once_with( - "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';" - ) - - assert sql_lobe._parser.call_count == 2 - assert sql_lobe._update_atom_indexes.call_count == 2 - assert sql_lobe._insert_atoms.call_count == 2 - assert sql_lobe.mapper.map_table.call_count == 2 - - def test_parser(self, sql_lobe): - cursor_mock = mock.MagicMock() - cursor_mock.fetchall.side_effect = [ - [('id', 'integer', 'PK'), ('name', 'varchar', '')], - [('1',), ('2',)], - [('John',), ('Jane',)], - ] - cursor_mock.fetchone.side_effect = [('id',), ('name',)] - table = sql_lobe._parser(cursor_mock, 'users') - - assert table.table_name == 'users' - assert len(table.columns) == 2 - assert table.columns[0]['name'] == 'id' - assert table.columns[0]['type'] == 'integer' - assert table.columns[0]['constraint_type'] == 'PK' - assert table.columns[1]['name'] == 'name' - assert table.columns[1]['type'] == 'varchar' - assert table.columns[1]['constraint_type'] == '' - assert len(table.rows) == 2 - assert table.rows[0]['id'] == '1' - assert table.rows[0]['name'] == 'John' - assert table.rows[1]['id'] == '2' - assert table.rows[1]['name'] == 'Jane' - - def test_insert_atoms(self, sql_lobe): - atoms = [ - {"_id": "1", "name": "Atom 1"}, - {"_id": "2", "name": "Atom 2"}, - {"_id": "3", "name": "Atom 3"}, - ] - expected_keys = ['atoms:1', 'atoms:2', 'atoms:3'] - expected_values = [json.dumps(atom) for atom in atoms] - - sql_lobe.redis.set = mock.MagicMock() - - sql_lobe._insert_atoms(atoms) - - assert sql_lobe.redis.set.call_count == len(atoms) - for key, value in zip(expected_keys, expected_values): - sql_lobe.redis.set.assert_any_call(key, value) - - def test_retrieve_document(self, sql_lobe): - handle = "1" - key = f'atoms:{handle}' - document = {"_id": "1", "name": "Atom 1"} - json_document = json.dumps(document) - - sql_lobe.redis.get = mock.MagicMock(return_value=json_document) - - result = sql_lobe._retrieve_document(handle) - - sql_lobe.redis.get.assert_called_once_with(key) - assert result == document - - def test_retrieve_document_not_found(self, sql_lobe): - handle = "1" - key = f'atoms:{handle}' - - sql_lobe.redis.get = mock.MagicMock(return_value=None) - - result = sql_lobe._retrieve_document(handle) - - sql_lobe.redis.get.assert_called_once_with(key) - assert result is None - - def test_retrieve_document_with_document_link(self, sql_lobe): - handle = "1" - document = {'_id': '1', 'name': 'Atom 1', 'link': 'atoms:2', 'targets': ['atoms:2']} - json_document = json.dumps(document) - linked_document = {"_id": "2", "name": "Atom 2"} - - sql_lobe.redis.get = mock.MagicMock( - side_effect=[json_document, json.dumps(linked_document)] - ) - sql_lobe._is_document_link = mock.MagicMock(return_value=True) - sql_lobe._get_document_keys = mock.MagicMock(return_value=["atoms:2"]) - - result = sql_lobe._retrieve_document(handle) - - sql_lobe._is_document_link.assert_called_once_with(document) - sql_lobe._get_document_keys.assert_called_once_with(document) - document["targets"] = ["atoms:2"] - assert result == document - - def test_retrieve_all_documents(self, sql_lobe): - sql_lobe.redis.mget = mock.MagicMock( - return_value=[ - json.dumps({"_id": "1", "name": "Atom 1"}), - json.dumps({"_id": "2", "name": "Atom 2"}), - json.dumps({"_id": "3", "name": "Atom 3"}), - ] - ) - - result = sql_lobe._retrieve_all_documents() - assert len(result) == 3 - assert result == [ - {"_id": "1", "name": "Atom 1"}, - {"_id": "2", "name": "Atom 2"}, - {"_id": "3", "name": "Atom 3"}, - ] - - result = sql_lobe._retrieve_all_documents(key="_id", value="2") - assert len(result) == 1 - assert result == [{"_id": "2", "name": "Atom 2"}] - - result = sql_lobe._retrieve_all_documents(key="_id", value=True) - assert len(result) == 3 - assert result == [ - {"_id": "1", "name": "Atom 1"}, - {"_id": "2", "name": "Atom 2"}, - {"_id": "3", "name": "Atom 3"}, - ] - - result = sql_lobe._retrieve_all_documents(key="_id", value=False) - assert len(result) == 0 - assert result == []