Skip to content

Commit

Permalink
🎨Computational backend: DV-2 computational scheduler becomes replicab…
Browse files Browse the repository at this point in the history
…le (🗃️🚨) (ITISFoundation#6736)
  • Loading branch information
sanderegg authored Dec 2, 2024
1 parent 994c575 commit a2f9058
Show file tree
Hide file tree
Showing 45 changed files with 3,224 additions and 1,125 deletions.
9 changes: 2 additions & 7 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,20 @@ parallel = True

[report]
# Regexes for lines to exclude from consideration
exclude_lines =
# Have to re-enable the standard pragma
pragma: no cover

exclude_also =
# Don't complain about missing debug-only code:
def __repr__
if self\.debug

# Don't complain if tests don't hit defensive assertion code:
raise AssertionError
raise NotImplementedError

# Don't complain if non-runnable code isn't run:
if 0:
if __name__ == .__main__.:
if __name__ == __main__.:
class .*\bProtocol\):
# Don't complain about abstract methods, they aren't run:
@(abc\.)?abstract(((class|static)?method)|property)

# Don't complain about type checking
if TYPE_CHECKING:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

UUIDStr: TypeAlias = Annotated[str, StringConstraints(pattern=UUID_RE)]

NodeIDStr = UUIDStr
NodeIDStr: TypeAlias = UUIDStr

LocationID = int
LocationName = str
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""add_timezone_comp_tasks
Revision ID: 7ad64e963e0f
Revises: b7f23f6d8aa2
Create Date: 2024-11-27 22:28:51.898433+00:00
"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "7ad64e963e0f"
down_revision = "b7f23f6d8aa2"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"comp_tasks",
"submit",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=True,
)
op.alter_column(
"comp_tasks",
"start",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=True,
)
op.alter_column(
"comp_tasks",
"end",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=True,
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"comp_tasks",
"end",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=True,
)
op.alter_column(
"comp_tasks",
"start",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=True,
)
op.alter_column(
"comp_tasks",
"submit",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=True,
)
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""added_distributed_comp_scheduler
Revision ID: b7f23f6d8aa2
Revises: c9db8bf5091e
Create Date: 2024-11-26 17:06:27.053774+00:00
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "b7f23f6d8aa2"
down_revision = "c9db8bf5091e"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"comp_runs", sa.Column("scheduled", sa.DateTime(timezone=True), nullable=True)
)
op.add_column(
"comp_runs", sa.Column("processed", sa.DateTime(timezone=True), nullable=True)
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("comp_runs", "processed")
op.drop_column("comp_runs", "scheduled")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""add_timezone_comp_runs
Revision ID: e05bdc5b3c7b
Revises: 7ad64e963e0f
Create Date: 2024-11-27 22:51:21.112336+00:00
"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "e05bdc5b3c7b"
down_revision = "7ad64e963e0f"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"comp_runs",
"created",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=False,
existing_server_default="now()",
)
op.alter_column(
"comp_runs",
"modified",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=False,
existing_server_default="now()",
)
op.alter_column(
"comp_runs",
"started",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=True,
)
op.alter_column(
"comp_runs",
"ended",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=True,
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"comp_runs",
"ended",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=True,
)
op.alter_column(
"comp_runs",
"started",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=True,
)
op.alter_column(
"comp_runs",
"modified",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=False,
existing_server_default="now()",
)
op.alter_column(
"comp_runs",
"created",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=False,
existing_server_default="now()",
)
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
""" Computational Runs Table
"""

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.sql import func

from ._common import RefActions
from ._common import RefActions, column_created_datetime, column_modified_datetime
from .base import metadata
from .comp_pipeline import StateType

Expand Down Expand Up @@ -72,31 +72,18 @@
doc="The result of the run entry",
),
# dag node id and class
sa.Column(
"created",
sa.DateTime(),
nullable=False,
server_default=func.now(),
doc="When the run entry was created",
),
sa.Column(
"modified",
sa.DateTime(),
nullable=False,
server_default=func.now(),
onupdate=func.now(), # this will auto-update on modification
doc="When the run entry was last modified",
),
column_created_datetime(timezone=True),
column_modified_datetime(timezone=True),
# utc timestamps for submission/start/end
sa.Column(
"started",
sa.DateTime,
sa.DateTime(timezone=True),
nullable=True,
doc="When the run was started",
),
sa.Column(
"ended",
sa.DateTime,
sa.DateTime(timezone=True),
nullable=True,
doc="When the run was finished",
),
Expand All @@ -106,6 +93,18 @@
nullable=True,
doc="If filled, when cancellation was requested",
),
sa.Column(
"scheduled",
sa.DateTime(timezone=True),
nullable=True,
doc="last time the pipeline was scheduled to be processed",
),
sa.Column(
"processed",
sa.DateTime(timezone=True),
nullable=True,
doc="last time the pipeline was actually processed",
),
sa.Column("metadata", JSONB, nullable=True, doc="the run optional metadata"),
sa.Column(
"use_on_demand_clusters",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
""" Computational Tasks Table
"""

import enum

import sqlalchemy as sa
Expand Down Expand Up @@ -77,9 +78,15 @@ class NodeClass(enum.Enum):
doc="current progress of the task if available",
),
# utc timestamps for submission/start/end
sa.Column("submit", sa.DateTime, doc="UTC timestamp for task submission"),
sa.Column("start", sa.DateTime, doc="UTC timestamp when task started"),
sa.Column("end", sa.DateTime, doc="UTC timestamp for task completion"),
sa.Column(
"submit", sa.DateTime(timezone=True), doc="UTC timestamp for task submission"
),
sa.Column(
"start", sa.DateTime(timezone=True), doc="UTC timestamp when task started"
),
sa.Column(
"end", sa.DateTime(timezone=True), doc="UTC timestamp for task completion"
),
sa.Column(
"last_heartbeat",
sa.DateTime(timezone=True),
Expand Down
23 changes: 22 additions & 1 deletion packages/pytest-simcore/src/pytest_simcore/rabbit_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import asyncio
import logging
from collections.abc import AsyncIterator, Awaitable, Callable
from contextlib import suppress

import aio_pika
import pytest
import tenacity
from servicelib.rabbitmq import RabbitMQClient, RabbitMQRPCClient
from servicelib.rabbitmq import QueueName, RabbitMQClient, RabbitMQRPCClient
from settings_library.rabbit import RabbitSettings
from tenacity.before_sleep import before_sleep_log
from tenacity.stop import stop_after_attempt
Expand Down Expand Up @@ -131,3 +132,23 @@ async def _creator(client_name: str, *, heartbeat: int = 60) -> RabbitMQRPCClien
yield _creator
# cleanup, properly close the clients
await asyncio.gather(*(client.close() for client in created_clients))


@pytest.fixture
async def ensure_parametrized_queue_is_empty(
create_rabbitmq_client: Callable[[str], RabbitMQClient], queue_name: QueueName
) -> AsyncIterator[None]:
rabbitmq_client = create_rabbitmq_client("pytest-purger")

async def _queue_messages_purger() -> None:
with suppress(aio_pika.exceptions.ChannelClosed):
assert rabbitmq_client._channel_pool # noqa: SLF001
async with rabbitmq_client._channel_pool.acquire() as channel: # noqa: SLF001
assert isinstance(channel, aio_pika.RobustChannel)
queue = await channel.get_queue(queue_name)
await queue.purge()

await _queue_messages_purger()
yield
# cleanup
await _queue_messages_purger()
Loading

0 comments on commit a2f9058

Please sign in to comment.