Skip to content

Commit

Permalink
Merge branch 'master' into mai/db-async-engine
Browse files Browse the repository at this point in the history
  • Loading branch information
pcrespov authored Sep 30, 2024
2 parents 0a8de81 + 52771a9 commit da1b807
Show file tree
Hide file tree
Showing 31 changed files with 799 additions and 477 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ class Vendor(TypedDict, total=False):
invitation_url: str # How to request a trial invitation? (if applies)
invitation_form: bool # If True, it takes precendence over invitation_url and asks the FE to show the form (if defined)

has_landing_page: bool # Is Landing page enabled

release_notes_url_template: str # a template url where `{vtag}` will be replaced, eg: "http://example.com/{vtag}.md"


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine

_logger = logging.getLogger(__name__)


@asynccontextmanager
async def pass_or_acquire_connection(
engine: AsyncEngine, connection: AsyncConnection | None = None
) -> AsyncIterator[AsyncConnection]:
# NOTE: When connection is passed, the engine is actually not needed
# NOTE: Creator is responsible of closing connection
is_connection_created = connection is None
if is_connection_created:
connection = await engine.connect()
try:
assert connection # nosec
yield connection
finally:
assert connection # nosec
assert not connection.closed # nosec
if is_connection_created and connection:
await connection.close()


@asynccontextmanager
async def transaction_context(
engine: AsyncEngine, connection: AsyncConnection | None = None
):
async with pass_or_acquire_connection(engine, connection) as conn:
if conn.in_transaction():
async with conn.begin_nested(): # inner transaction (savepoint)
yield conn
else:
try:
async with conn.begin(): # outer transaction (savepoint)
yield conn
finally:
assert not conn.closed # nosec
assert not conn.in_transaction() # nosec
84 changes: 71 additions & 13 deletions packages/postgres-database/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# pylint: disable=unused-variable

import uuid
import warnings
from collections.abc import AsyncIterator, Awaitable, Callable, Iterator
from pathlib import Path

Expand Down Expand Up @@ -37,6 +38,7 @@
user_to_groups,
users,
)
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

pytest_plugins = [
"pytest_simcore.pytest_global_environs",
Expand Down Expand Up @@ -81,6 +83,30 @@ def _make(is_async=True) -> Awaitable[Engine] | sa.engine.base.Engine:
return _make


@pytest.fixture
def make_asyncpg_engine(postgres_service: str) -> Callable[[bool], AsyncEngine]:
# NOTE: users is responsible of `await engine.dispose()`
dsn = postgres_service.replace("postgresql://", "postgresql+asyncpg://")
minsize = 1
maxsize = 50

def _(echo: bool):
engine: AsyncEngine = create_async_engine(
dsn,
pool_size=minsize,
max_overflow=maxsize - minsize,
connect_args={
"server_settings": {"application_name": "postgres_database_tests"}
},
pool_pre_ping=True, # https://docs.sqlalchemy.org/en/14/core/pooling.html#dealing-with-disconnects
future=True, # this uses sqlalchemy 2.0 API, shall be removed when sqlalchemy 2.0 is released
echo=echo,
)
return engine

return _


def is_postgres_responsive(dsn) -> bool:
"""Check if something responds to ``url``"""
try:
Expand All @@ -107,6 +133,11 @@ def pg_sa_engine(
) -> Iterator[sa.engine.Engine]:
"""
Runs migration to create tables and return a sqlalchemy engine
NOTE: use this fixture to ensure pg db:
- up,
- responsive,
- init (w/ tables) and/or migrated
"""
# NOTE: Using migration to upgrade/downgrade is not
# such a great idea since these tests are used while developing
Expand Down Expand Up @@ -142,29 +173,56 @@ def pg_sa_engine(


@pytest.fixture
async def pg_engine(
async def aiopg_engine(
pg_sa_engine: sa.engine.Engine, make_engine: Callable
) -> AsyncIterator[Engine]:
"""
Return an aiopg.sa engine connected to a responsive and migrated pg database
"""
async_engine = await make_engine(is_async=True)

yield async_engine
aiopg_sa_engine = await make_engine(is_async=True)

warnings.warn(
"The 'aiopg_engine' is deprecated since we are replacing `aiopg` library by `sqlalchemy.ext.asyncio`."
"SEE https://github.com/ITISFoundation/osparc-simcore/issues/4529. "
"Please use 'asyncpg_engine' instead.",
DeprecationWarning,
stacklevel=2,
)

yield aiopg_sa_engine

# closes async-engine connections and terminates
async_engine.close()
await async_engine.wait_closed()
async_engine.terminate()
aiopg_sa_engine.close()
await aiopg_sa_engine.wait_closed()
aiopg_sa_engine.terminate()


@pytest.fixture
async def connection(pg_engine: Engine) -> AsyncIterator[SAConnection]:
async def connection(aiopg_engine: Engine) -> AsyncIterator[SAConnection]:
"""Returns an aiopg.sa connection from an engine to a fully furnished and ready pg database"""
async with pg_engine.acquire() as _conn:
async with aiopg_engine.acquire() as _conn:
yield _conn


@pytest.fixture
async def asyncpg_engine(
is_pdb_enabled: bool,
pg_sa_engine: sa.engine.Engine,
make_asyncpg_engine: Callable[[bool], AsyncEngine],
) -> AsyncIterator[AsyncEngine]:

assert (
pg_sa_engine
), "Ensures pg db up, responsive, init (w/ tables) and/or migrated"

_apg_engine = make_asyncpg_engine(is_pdb_enabled)

yield _apg_engine

await _apg_engine.dispose()


#
# FACTORY FIXTURES
#
Expand Down Expand Up @@ -240,7 +298,7 @@ async def _creator(conn, group: RowProxy | None = None, **overrides) -> RowProxy

@pytest.fixture
async def create_fake_cluster(
pg_engine: Engine, faker: Faker
aiopg_engine: Engine, faker: Faker
) -> AsyncIterator[Callable[..., Awaitable[int]]]:
cluster_ids = []
assert cluster_to_groups is not None
Expand All @@ -254,7 +312,7 @@ async def _creator(**overrides) -> int:
"authentication": faker.pydict(value_types=[str]),
}
insert_values.update(overrides)
async with pg_engine.acquire() as conn:
async with aiopg_engine.acquire() as conn:
cluster_id = await conn.scalar(
clusters.insert().values(**insert_values).returning(clusters.c.id)
)
Expand All @@ -265,13 +323,13 @@ async def _creator(**overrides) -> int:
yield _creator

# cleanup
async with pg_engine.acquire() as conn:
async with aiopg_engine.acquire() as conn:
await conn.execute(clusters.delete().where(clusters.c.id.in_(cluster_ids)))


@pytest.fixture
async def create_fake_project(
pg_engine: Engine,
aiopg_engine: Engine,
) -> AsyncIterator[Callable[..., Awaitable[RowProxy]]]:
created_project_uuids = []

Expand All @@ -288,7 +346,7 @@ async def _creator(conn, user: RowProxy, **overrides) -> RowProxy:

yield _creator

async with pg_engine.acquire() as conn:
async with aiopg_engine.acquire() as conn:
await conn.execute(
projects.delete().where(projects.c.uuid.in_(created_project_uuids))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@


async def test_load_products(
pg_engine: Engine, make_products_table: Callable, products_regex: dict
aiopg_engine: Engine, make_products_table: Callable, products_regex: dict
):
exclude = {
products.c.created,
products.c.modified,
}

async with pg_engine.acquire() as conn:
async with aiopg_engine.acquire() as conn:
await make_products_table(conn)

stmt = sa.select(*[c for c in products.columns if c not in exclude])
Expand All @@ -49,14 +49,14 @@ async def test_load_products(


async def test_jinja2_templates_table(
pg_engine: Engine, osparc_simcore_services_dir: Path
aiopg_engine: Engine, osparc_simcore_services_dir: Path
):
templates_common_dir = (
osparc_simcore_services_dir
/ "web/server/src/simcore_service_webserver/templates/common"
)

async with pg_engine.acquire() as conn:
async with aiopg_engine.acquire() as conn:
templates = []
# templates table
for p in templates_common_dir.glob("*.jinja2"):
Expand Down Expand Up @@ -135,7 +135,7 @@ async def test_jinja2_templates_table(


async def test_insert_select_product(
pg_engine: Engine,
aiopg_engine: Engine,
):
osparc_product = {
"name": "osparc",
Expand Down Expand Up @@ -174,7 +174,7 @@ async def test_insert_select_product(

print(json.dumps(osparc_product))

async with pg_engine.acquire() as conn:
async with aiopg_engine.acquire() as conn:
# writes
stmt = (
pg_insert(products)
Expand Down
18 changes: 9 additions & 9 deletions packages/postgres-database/tests/products/test_utils_products.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,24 @@
)


async def test_default_product(pg_engine: Engine, make_products_table: Callable):
async with pg_engine.acquire() as conn:
async def test_default_product(aiopg_engine: Engine, make_products_table: Callable):
async with aiopg_engine.acquire() as conn:
await make_products_table(conn)
default_product = await get_default_product_name(conn)
assert default_product == "s4l"


@pytest.mark.parametrize("pg_sa_engine", ["sqlModels"], indirect=True)
async def test_default_product_undefined(pg_engine: Engine):
async with pg_engine.acquire() as conn:
async def test_default_product_undefined(aiopg_engine: Engine):
async with aiopg_engine.acquire() as conn:
with pytest.raises(ValueError):
await get_default_product_name(conn)


async def test_get_or_create_group_product(
pg_engine: Engine, make_products_table: Callable
aiopg_engine: Engine, make_products_table: Callable
):
async with pg_engine.acquire() as conn:
async with aiopg_engine.acquire() as conn:
await make_products_table(conn)

async for product_row in await conn.execute(
Expand Down Expand Up @@ -105,13 +105,13 @@ async def test_get_or_create_group_product(
reason="Not relevant. Will review in https://github.com/ITISFoundation/osparc-simcore/issues/3754"
)
async def test_get_or_create_group_product_concurrent(
pg_engine: Engine, make_products_table: Callable
aiopg_engine: Engine, make_products_table: Callable
):
async with pg_engine.acquire() as conn:
async with aiopg_engine.acquire() as conn:
await make_products_table(conn)

async def _auto_create_products_groups():
async with pg_engine.acquire() as conn:
async with aiopg_engine.acquire() as conn:
async for product_row in await conn.execute(
sa.select(products.c.name, products.c.group_id).order_by(
products.c.priority
Expand Down
12 changes: 6 additions & 6 deletions packages/postgres-database/tests/projects/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@


@pytest.fixture
async def user(pg_engine: Engine) -> RowProxy:
async def user(aiopg_engine: Engine) -> RowProxy:
_USERNAME = f"{__name__}.me"
# some user
async with pg_engine.acquire() as conn:
async with aiopg_engine.acquire() as conn:
result: ResultProxy | None = await conn.execute(
users.insert().values(**random_user(name=_USERNAME)).returning(users)
)
Expand All @@ -32,10 +32,10 @@ async def user(pg_engine: Engine) -> RowProxy:


@pytest.fixture
async def project(pg_engine: Engine, user: RowProxy) -> RowProxy:
async def project(aiopg_engine: Engine, user: RowProxy) -> RowProxy:
_PARENT_PROJECT_NAME = f"{__name__}.parent"
# a user's project
async with pg_engine.acquire() as conn:
async with aiopg_engine.acquire() as conn:
result: ResultProxy | None = await conn.execute(
projects.insert()
.values(**random_project(prj_owner=user.id, name=_PARENT_PROJECT_NAME))
Expand All @@ -50,6 +50,6 @@ async def project(pg_engine: Engine, user: RowProxy) -> RowProxy:


@pytest.fixture
async def conn(pg_engine: Engine) -> AsyncIterable[SAConnection]:
async with pg_engine.acquire() as conn:
async def conn(aiopg_engine: Engine) -> AsyncIterable[SAConnection]:
async with aiopg_engine.acquire() as conn:
yield conn
4 changes: 2 additions & 2 deletions packages/postgres-database/tests/test_classifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ def classifiers_bundle(web_client_resource_folder: Path) -> dict:


async def test_operations_on_group_classifiers(
pg_engine: Engine, classifiers_bundle: dict
aiopg_engine: Engine, classifiers_bundle: dict
):
# NOTE: mostly for TDD
async with pg_engine.acquire() as conn:
async with aiopg_engine.acquire() as conn:
# creates a group
stmt = (
groups.insert()
Expand Down
Loading

0 comments on commit da1b807

Please sign in to comment.