Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎨Computational backend: DV-2 computational scheduler becomes replicable (🗃️🚨) #6736

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
127 commits
Select commit Hold shift + click to select a range
59c72e9
added last_scheduled datetime
sanderegg Nov 15, 2024
1828bf5
ongoing new scheduler
sanderegg Nov 15, 2024
2b12dcd
ongoing new scheduler
sanderegg Nov 15, 2024
70c6573
ongoing new scheduler
sanderegg Nov 15, 2024
b1fb6c0
initial test
sanderegg Nov 15, 2024
9250d06
initial test
sanderegg Nov 15, 2024
0810978
initial test
sanderegg Nov 15, 2024
94caa2a
added rabbit message exchange
sanderegg Nov 15, 2024
655d514
skeleton for distributed scheduler
sanderegg Nov 15, 2024
3654444
skeleton for distributed scheduler
sanderegg Nov 15, 2024
4771071
checking rabbit mq
sanderegg Nov 17, 2024
adb6689
checking comp_runs table
sanderegg Nov 17, 2024
081d7e1
checking comp_runs table
sanderegg Nov 17, 2024
dd5cce7
ongoing new scheduler tests
sanderegg Nov 18, 2024
39a4633
ongoing new scheduler tests
sanderegg Nov 18, 2024
01cf898
fixed tests
sanderegg Nov 18, 2024
dad78fb
100% tested
sanderegg Nov 18, 2024
acceed4
setup distributed scheduler
sanderegg Nov 18, 2024
a6ddc56
first connection manager/worker
sanderegg Nov 18, 2024
353e180
updated syntax in tests
sanderegg Nov 18, 2024
dfbf38f
splitted code
sanderegg Nov 18, 2024
8dc49d6
refactored
sanderegg Nov 18, 2024
fd50743
refactored
sanderegg Nov 18, 2024
96a2424
cleanup
sanderegg Nov 18, 2024
4dbfc09
almost there
sanderegg Nov 18, 2024
1add067
ensure naming
sanderegg Nov 18, 2024
083ca86
renaming
sanderegg Nov 18, 2024
68cb686
rename method
sanderegg Nov 18, 2024
864591e
moved method
sanderegg Nov 18, 2024
5aac1ea
cleanup
sanderegg Nov 18, 2024
10bbcb4
renaming
sanderegg Nov 18, 2024
8d8cb66
renaming
sanderegg Nov 18, 2024
1381706
ongoing
sanderegg Nov 18, 2024
c4b13ce
missing import
sanderegg Nov 19, 2024
22c27b6
moved utils
sanderegg Nov 19, 2024
8843b2e
moved iteartion to models
sanderegg Nov 19, 2024
d037813
moved iteartion to models
sanderegg Nov 19, 2024
fe64bf8
move test to context
sanderegg Nov 19, 2024
63612b6
checking tests
sanderegg Nov 19, 2024
01406fb
base test for worker init/shutdown
sanderegg Nov 19, 2024
86df8bf
re-order
sanderegg Nov 19, 2024
f736f54
rename
sanderegg Nov 19, 2024
c37f1b5
put it back in working state
sanderegg Nov 19, 2024
96402ce
cleaning
sanderegg Nov 19, 2024
748cbb9
rename
sanderegg Nov 19, 2024
1a5157e
refactor and fixes
sanderegg Nov 19, 2024
2eacd17
refactoring before testing
sanderegg Nov 19, 2024
b3e7ac2
test if fixed
sanderegg Nov 20, 2024
68f9f56
missing service dependencies
sanderegg Nov 20, 2024
7e418b3
added basic test
sanderegg Nov 20, 2024
f9ff4e5
ensure we call shutdown on the worker as well
sanderegg Nov 20, 2024
962b5da
the callback is run in a separate thread
sanderegg Nov 20, 2024
6aba458
add documentation
sanderegg Nov 20, 2024
dac33f3
removed wake_up callback from api interface
sanderegg Nov 20, 2024
a6ee7c7
add no cover for abstract methods
sanderegg Nov 20, 2024
7ab4dc0
revert
sanderegg Nov 20, 2024
71e0079
use new style
sanderegg Nov 21, 2024
d7c49a0
use docstrings so that coverage is correctly computed
sanderegg Nov 21, 2024
62f34c9
pyv2
sanderegg Nov 22, 2024
7701430
unskip test
sanderegg Nov 22, 2024
2fa420a
improve name
sanderegg Nov 22, 2024
a79df13
add docs
sanderegg Nov 22, 2024
a758376
merge
sanderegg Nov 22, 2024
75b7983
add doc for next PR
sanderegg Nov 24, 2024
328e518
add doc for next PR
sanderegg Nov 24, 2024
dec4b4b
ruff
sanderegg Nov 24, 2024
34def37
fix after merge
sanderegg Nov 24, 2024
819b71f
cleanup
sanderegg Nov 24, 2024
a4bca60
cleanup
sanderegg Nov 24, 2024
c0d749f
change signature
sanderegg Nov 24, 2024
dc2e7ce
changed syntax
sanderegg Nov 24, 2024
f36adc4
initial implementation
sanderegg Nov 24, 2024
d5dcf7f
maybe
sanderegg Nov 24, 2024
3c964ad
maybe
sanderegg Nov 24, 2024
cb65ccb
refactor
sanderegg Nov 25, 2024
cc2a002
refactor
sanderegg Nov 25, 2024
5ddab65
docs
sanderegg Nov 25, 2024
57b770e
ruff
sanderegg Nov 25, 2024
5a5b2db
some fine tuning for tests
sanderegg Nov 25, 2024
b45f33f
use the correct v2 method
sanderegg Nov 25, 2024
1fcbf12
changed syntax
sanderegg Nov 25, 2024
3281ffe
test the callback mechanism
sanderegg Nov 25, 2024
db50afb
test callback mechanism
sanderegg Nov 25, 2024
4c80279
renaming
sanderegg Nov 25, 2024
c54337f
creation of parallel test
sanderegg Nov 25, 2024
8df0116
renaming
sanderegg Nov 26, 2024
1e78145
added rabbitmq queue purger
sanderegg Nov 26, 2024
b2596bb
cleanup
sanderegg Nov 26, 2024
efc46f1
test parallelism
sanderegg Nov 26, 2024
d90312e
added setting to control scheduling concurrency
sanderegg Nov 26, 2024
a45cfbe
ensure unsubscribe consumer is only unsubscribing the right consumer …
sanderegg Nov 26, 2024
1c4ae48
fix test after renaming
sanderegg Nov 26, 2024
986cd42
fixed after new syntax
sanderegg Nov 26, 2024
7dc96af
ensure worker marks the scheduling as done
sanderegg Nov 26, 2024
34fa851
manager checks for properly scheduled tasks and lost ones
sanderegg Nov 26, 2024
7ed3b92
changed column names and add processed column
sanderegg Nov 26, 2024
94c1ed2
createing tests for repository
sanderegg Nov 26, 2024
47c4aff
moving to asyncengine
sanderegg Nov 26, 2024
c9b85af
moving to asyncengine
sanderegg Nov 26, 2024
28cb4b9
use begin
sanderegg Nov 26, 2024
2357932
repository almost tested
sanderegg Nov 26, 2024
0963624
repository almost tested
sanderegg Nov 26, 2024
743f82b
repository almost tested
sanderegg Nov 26, 2024
11eede8
testing listing
sanderegg Nov 27, 2024
4028fbb
testing listing with filter
sanderegg Nov 27, 2024
a9c7cf8
listing with processed since works
sanderegg Nov 27, 2024
76b494a
handling of processed and scheduled
sanderegg Nov 27, 2024
f37a3b5
doc
sanderegg Nov 27, 2024
1580044
convert comp_tasks timestamps to tz aware
sanderegg Nov 27, 2024
7a586f4
convert timestamps to utc aware
sanderegg Nov 27, 2024
f4eecf9
ensure metadata are jsonable encoded
sanderegg Nov 27, 2024
f7898c2
refactor
sanderegg Nov 28, 2024
adb865b
linter
sanderegg Nov 28, 2024
5c54a68
mypy
sanderegg Nov 28, 2024
28d5b53
revert
sanderegg Nov 28, 2024
a642af1
mypy
sanderegg Nov 29, 2024
3832cca
the lifespan manager cannot be used with the TestClient
sanderegg Nov 29, 2024
9f1b748
added a silence_exceptions decorator
sanderegg Nov 29, 2024
570bfd0
added test for silencing exceptions
sanderegg Nov 29, 2024
33e7259
typo
sanderegg Nov 29, 2024
11cb145
fix serialization using context
sanderegg Nov 29, 2024
dede26a
revert and fix
sanderegg Nov 29, 2024
7f279c5
fix tests
sanderegg Nov 29, 2024
3c93db2
@pcrespov review: remove match and use a mapping
sanderegg Dec 2, 2024
b7d1d2c
fixed typo
sanderegg Dec 2, 2024
5604886
@pcrespov review: use more robust checks
sanderegg Dec 2, 2024
a0e3990
fixed syntax
sanderegg Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,20 @@ parallel = True

[report]
# Regexes for lines to exclude from consideration
exclude_lines =
# Have to re-enable the standard pragma
pragma: no cover

exclude_also =
# Don't complain about missing debug-only code:
def __repr__
if self\.debug

# Don't complain if tests don't hit defensive assertion code:
raise AssertionError
raise NotImplementedError

# Don't complain if non-runnable code isn't run:
if 0:
if __name__ == .__main__.:
if __name__ == __main__.:
class .*\bProtocol\):
# Don't complain about abstract methods, they aren't run:
@(abc\.)?abstract(((class|static)?method)|property)

# Don't complain about type checking
if TYPE_CHECKING:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

UUIDStr: TypeAlias = Annotated[str, StringConstraints(pattern=UUID_RE)]

NodeIDStr = UUIDStr
NodeIDStr: TypeAlias = UUIDStr

LocationID = int
LocationName = str
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""add_timezone_comp_tasks

Revision ID: 7ad64e963e0f
Revises: b7f23f6d8aa2
Create Date: 2024-11-27 22:28:51.898433+00:00

"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "7ad64e963e0f"
down_revision = "b7f23f6d8aa2"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"comp_tasks",
"submit",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=True,
)
op.alter_column(
"comp_tasks",
"start",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=True,
)
op.alter_column(
"comp_tasks",
"end",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=True,
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"comp_tasks",
"end",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=True,
)
op.alter_column(
"comp_tasks",
"start",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=True,
)
op.alter_column(
"comp_tasks",
"submit",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=True,
)
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""added_distributed_comp_scheduler

Revision ID: b7f23f6d8aa2
Revises: c9db8bf5091e
Create Date: 2024-11-26 17:06:27.053774+00:00

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "b7f23f6d8aa2"
down_revision = "c9db8bf5091e"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"comp_runs", sa.Column("scheduled", sa.DateTime(timezone=True), nullable=True)
)
op.add_column(
"comp_runs", sa.Column("processed", sa.DateTime(timezone=True), nullable=True)
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("comp_runs", "processed")
op.drop_column("comp_runs", "scheduled")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""add_timezone_comp_runs

Revision ID: e05bdc5b3c7b
Revises: 7ad64e963e0f
Create Date: 2024-11-27 22:51:21.112336+00:00

"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "e05bdc5b3c7b"
down_revision = "7ad64e963e0f"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"comp_runs",
"created",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=False,
existing_server_default="now()",
)
op.alter_column(
"comp_runs",
"modified",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=False,
existing_server_default="now()",
)
op.alter_column(
"comp_runs",
"started",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=True,
)
op.alter_column(
"comp_runs",
"ended",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=True,
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"comp_runs",
"ended",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=True,
)
op.alter_column(
"comp_runs",
"started",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=True,
)
op.alter_column(
"comp_runs",
"modified",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=False,
existing_server_default="now()",
)
op.alter_column(
"comp_runs",
"created",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=False,
existing_server_default="now()",
)
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
""" Computational Runs Table

"""

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.sql import func

from ._common import RefActions
from ._common import RefActions, column_created_datetime, column_modified_datetime
from .base import metadata
from .comp_pipeline import StateType

Expand Down Expand Up @@ -72,31 +72,18 @@
doc="The result of the run entry",
),
# dag node id and class
sa.Column(
"created",
sa.DateTime(),
nullable=False,
server_default=func.now(),
doc="When the run entry was created",
),
sa.Column(
"modified",
sa.DateTime(),
nullable=False,
server_default=func.now(),
onupdate=func.now(), # this will auto-update on modification
doc="When the run entry was last modified",
),
column_created_datetime(timezone=True),
column_modified_datetime(timezone=True),
# utc timestamps for submission/start/end
sa.Column(
"started",
sa.DateTime,
sa.DateTime(timezone=True),
nullable=True,
doc="When the run was started",
),
sa.Column(
"ended",
sa.DateTime,
sa.DateTime(timezone=True),
nullable=True,
doc="When the run was finished",
),
Expand All @@ -106,6 +93,18 @@
nullable=True,
doc="If filled, when cancellation was requested",
),
sa.Column(
"scheduled",
sa.DateTime(timezone=True),
nullable=True,
doc="last time the pipeline was scheduled to be processed",
),
sa.Column(
"processed",
sa.DateTime(timezone=True),
nullable=True,
doc="last time the pipeline was actually processed",
),
sa.Column("metadata", JSONB, nullable=True, doc="the run optional metadata"),
sa.Column(
"use_on_demand_clusters",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
""" Computational Tasks Table

"""

import enum

import sqlalchemy as sa
Expand Down Expand Up @@ -77,9 +78,15 @@ class NodeClass(enum.Enum):
doc="current progress of the task if available",
),
# utc timestamps for submission/start/end
sa.Column("submit", sa.DateTime, doc="UTC timestamp for task submission"),
sa.Column("start", sa.DateTime, doc="UTC timestamp when task started"),
sa.Column("end", sa.DateTime, doc="UTC timestamp for task completion"),
sa.Column(
"submit", sa.DateTime(timezone=True), doc="UTC timestamp for task submission"
Copy link
Member

@pcrespov pcrespov Nov 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Q1: I thought the convention to follow in datetime columns is to use a verb in the past, i.e. submitted, started etc? Actually, I am going to change trashed_at because in the API we use the _at suffix but not in the database so i did this mistake. And because i have trashed_by to track the user as well.
    Are we getting here in an unnecessary camel-vs-snake case type of discussion? should we just use one and only one reasonable convention in both the api and database instead of so much unnecessary conversion overhead? :-) Which one you like? :-)

  • Q2: these are not nullable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These entries are older than you 🤣
I prefer not to add even more noise in this PR.

),
sa.Column(
"start", sa.DateTime(timezone=True), doc="UTC timestamp when task started"
),
sa.Column(
"end", sa.DateTime(timezone=True), doc="UTC timestamp for task completion"
),
sa.Column(
"last_heartbeat",
sa.DateTime(timezone=True),
Expand Down
23 changes: 22 additions & 1 deletion packages/pytest-simcore/src/pytest_simcore/rabbit_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import asyncio
import logging
from collections.abc import AsyncIterator, Awaitable, Callable
from contextlib import suppress

import aio_pika
import pytest
import tenacity
from servicelib.rabbitmq import RabbitMQClient, RabbitMQRPCClient
from servicelib.rabbitmq import QueueName, RabbitMQClient, RabbitMQRPCClient
from settings_library.rabbit import RabbitSettings
from tenacity.before_sleep import before_sleep_log
from tenacity.stop import stop_after_attempt
Expand Down Expand Up @@ -131,3 +132,23 @@ async def _creator(client_name: str, *, heartbeat: int = 60) -> RabbitMQRPCClien
yield _creator
# cleanup, properly close the clients
await asyncio.gather(*(client.close() for client in created_clients))


@pytest.fixture
async def ensure_parametrized_queue_is_empty(
create_rabbitmq_client: Callable[[str], RabbitMQClient], queue_name: QueueName
) -> AsyncIterator[None]:
rabbitmq_client = create_rabbitmq_client("pytest-purger")

async def _queue_messages_purger() -> None:
with suppress(aio_pika.exceptions.ChannelClosed):
assert rabbitmq_client._channel_pool # noqa: SLF001
async with rabbitmq_client._channel_pool.acquire() as channel: # noqa: SLF001
assert isinstance(channel, aio_pika.RobustChannel)
queue = await channel.get_queue(queue_name)
await queue.purge()

await _queue_messages_purger()
yield
# cleanup
await _queue_messages_purger()
Loading
Loading