Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into update-from-maste…
Browse files Browse the repository at this point in the history
…r-XXX
  • Loading branch information
sanderegg committed Nov 18, 2024
2 parents 65665ec + c83d60c commit 57058f9
Show file tree
Hide file tree
Showing 28 changed files with 627 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class ServiceRunGet(BaseModel):
user_email: str
project_id: ProjectID
project_name: str
project_tags: list[str]
node_id: NodeID
node_name: str
root_parent_project_id: ProjectID
Expand Down
2 changes: 1 addition & 1 deletion packages/models-library/src/models_library/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class BaseCluster(BaseModel):


ClusterID: TypeAlias = NonNegativeInt
DEFAULT_CLUSTER_ID: Final[NonNegativeInt] = 0
DEFAULT_CLUSTER_ID: Final[ClusterID] = 0


class Cluster(BaseCluster):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""enhance projects_tags for RUT
Revision ID: 8e1f83486be7
Revises: 8bfe65a5e294
Create Date: 2024-11-15 09:12:57.789183+00:00
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "8e1f83486be7"
down_revision = "8bfe65a5e294"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"projects_tags", sa.Column("project_uuid_for_rut", sa.String(), nullable=True)
)

# Migrate
op.execute(
sa.DDL(
"""
UPDATE projects_tags
SET project_uuid_for_rut = projects.uuid
FROM projects
WHERE projects_tags.project_id = projects.id;
"""
)
)

op.alter_column(
"projects_tags",
"project_uuid_for_rut",
existing_type=sa.String(),
nullable=False,
)
op.alter_column(
"projects_tags", "project_id", existing_type=sa.BIGINT(), nullable=True
)
op.drop_constraint(
"study_tags_study_id_tag_id_key", "projects_tags", type_="unique"
)
op.create_unique_constraint(
"project_tags_project_uuid_unique",
"projects_tags",
["project_uuid_for_rut", "tag_id"],
)
op.drop_constraint("study_tags_study_id_fkey", "projects_tags", type_="foreignkey")
op.create_foreign_key(
"project_tags_project_id_fkey",
"projects_tags",
"projects",
["project_id"],
["id"],
onupdate="CASCADE",
ondelete="SET NULL",
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
"project_tags_project_id_fkey", "projects_tags", type_="foreignkey"
)
op.create_foreign_key(
"study_tags_study_id_fkey",
"projects_tags",
"projects",
["project_id"],
["id"],
onupdate="CASCADE",
ondelete="CASCADE",
)
op.drop_constraint(
"project_tags_project_uuid_unique", "projects_tags", type_="unique"
)
op.create_unique_constraint(
"study_tags_study_id_tag_id_key", "projects_tags", ["project_id", "tag_id"]
)
op.alter_column(
"projects_tags", "project_id", existing_type=sa.BIGINT(), nullable=False
)
op.drop_column("projects_tags", "project_uuid_for_rut")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,27 @@
sa.Column(
"project_id",
sa.BigInteger,
sa.ForeignKey(projects.c.id, onupdate="CASCADE", ondelete="CASCADE"),
nullable=False,
doc="NOTE that project.c.id != project.c.uuid",
sa.ForeignKey(
projects.c.id,
onupdate="CASCADE",
ondelete="SET NULL",
name="project_tags_project_id_fkey",
),
nullable=True, # <-- NULL means that project was deleted
doc="NOTE that project.c.id != project.c.uuid. If project is deleted, we do not delete project in this table, we just set this column to NULL. Why? Because the `project_uuid_for_rut` is still used by resource usage tracker",
),
sa.Column(
"tag_id",
sa.BigInteger,
sa.ForeignKey(tags.c.id, onupdate="CASCADE", ondelete="CASCADE"),
nullable=False,
),
sa.UniqueConstraint("project_id", "tag_id"),
sa.Column(
"project_uuid_for_rut",
sa.String,
nullable=False,
),
sa.UniqueConstraint(
"project_uuid_for_rut", "tag_id", name="project_tags_project_uuid_unique"
),
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import functools
from uuid import UUID

import sqlalchemy as sa
from simcore_postgres_database.models.groups import user_to_groups
Expand Down Expand Up @@ -60,7 +61,7 @@ def get_tag_stmt(
# aggregation ensures MOST PERMISSIVE policy of access-rights
sa.func.bool_or(tags_access_rights.c.read).label("read"),
sa.func.bool_or(tags_access_rights.c.write).label("write"),
sa.func.bool_or(tags_access_rights.c.delete).label("delete")
sa.func.bool_or(tags_access_rights.c.delete).label("delete"),
)
.select_from(
_join_user_to_given_tag(
Expand All @@ -80,7 +81,7 @@ def list_tags_stmt(*, user_id: int):
# aggregation ensures MOST PERMISSIVE policy of access-rights
sa.func.bool_or(tags_access_rights.c.read).label("read"),
sa.func.bool_or(tags_access_rights.c.write).label("write"),
sa.func.bool_or(tags_access_rights.c.delete).label("delete")
sa.func.bool_or(tags_access_rights.c.delete).label("delete"),
)
.select_from(
_join_user_to_tags(
Expand All @@ -104,7 +105,7 @@ def count_groups_with_given_access_rights_stmt(
tag_id: int,
read: bool | None,
write: bool | None,
delete: bool | None
delete: bool | None,
):
"""
How many groups (from this user_id) are given EXACTLY these access permissions
Expand Down Expand Up @@ -192,12 +193,15 @@ def get_tags_for_project_stmt(*, project_index: int):
)


def add_tag_to_project_stmt(*, project_index: int, tag_id: int):
def add_tag_to_project_stmt(
*, project_index: int, tag_id: int, project_uuid_for_rut: UUID
):
return (
pg_insert(projects_tags)
.values(
project_id=project_index,
tag_id=tag_id,
project_uuid_for_rut=f"{project_uuid_for_rut}",
)
.on_conflict_do_nothing()
)
Expand Down
2 changes: 2 additions & 0 deletions packages/postgres-database/tests/test_utils_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ def _check(func_smt, **kwargs):
user_id = 425 # 4
tag_id = 4
project_index = 1
project_uuid = "106f8b4b-ffb6-459a-a27b-981c779e6d3f"
service_key = "simcore/services/comp/isolve"
service_version = "2.0.85"

Expand Down Expand Up @@ -726,6 +727,7 @@ def _check(func_smt, **kwargs):
add_tag_to_project_stmt,
project_index=project_index,
tag_id=tag_id,
project_uuid_for_rut=project_uuid,
)

_check(
Expand Down
39 changes: 37 additions & 2 deletions packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
from collections import defaultdict
from collections.abc import Generator, Iterator
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from enum import Enum, unique
from typing import Any, Final

import httpx
from playwright.sync_api import FrameLocator, Page, Request
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import WebSocket
from pydantic import AnyUrl
from pytest_simcore.helpers.logging_tools import log_context

SECOND: Final[int] = 1000
Expand Down Expand Up @@ -196,9 +199,11 @@ def __call__(self, message: str) -> None:
class SocketIONodeProgressCompleteWaiter:
node_id: str
logger: logging.Logger
product_url: AnyUrl
_current_progress: dict[NodeProgressType, float] = field(
default_factory=defaultdict
)
_last_poll_timestamp: datetime = field(default_factory=lambda: datetime.now(tz=UTC))

def __call__(self, message: str) -> bool:
# socket.io encodes messages like so
Expand Down Expand Up @@ -234,6 +239,27 @@ def __call__(self, message: str) -> bool:
round(progress, 1) == 1.0
for progress in self._current_progress.values()
)

_current_timestamp = datetime.now(UTC)
if _current_timestamp - self._last_poll_timestamp > timedelta(seconds=5):
url = f"https://{self.node_id}.services.{self.get_partial_product_url()}"
response = httpx.get(url, timeout=10)
self.logger.info(
"Querying the service endpoint from the E2E test. Url: %s Response: %s",
url,
response,
)
if response.status_code <= 401:
# NOTE: If the response status is less than 400, it means that the backend is ready (There are some services that respond with a 3XX)
# MD: for now I have included 401 - as this also means that backend is ready
if self.got_expected_node_progress_types():
self.logger.warning(
"⚠️ Progress bar didn't receive 100 percent but service is already running: %s ⚠️", # https://github.com/ITISFoundation/osparc-simcore/issues/6449
self.get_current_progress(),
)
return True
self._last_poll_timestamp = datetime.now(UTC)

return False

def got_expected_node_progress_types(self):
Expand All @@ -245,6 +271,9 @@ def got_expected_node_progress_types(self):
def get_current_progress(self):
return self._current_progress.values()

def get_partial_product_url(self):
return f"{self.product_url}".split("//")[1]


def wait_for_pipeline_state(
current_state: RunningState,
Expand Down Expand Up @@ -332,9 +361,12 @@ def expected_service_running(
websocket: WebSocket,
timeout: int,
press_start_button: bool,
product_url: AnyUrl,
) -> Generator[ServiceRunning, None, None]:
with log_context(logging.INFO, msg="Waiting for node to run") as ctx:
waiter = SocketIONodeProgressCompleteWaiter(node_id=node_id, logger=ctx.logger)
waiter = SocketIONodeProgressCompleteWaiter(
node_id=node_id, logger=ctx.logger, product_url=product_url
)
service_running = ServiceRunning(iframe_locator=None)

try:
Expand Down Expand Up @@ -366,12 +398,15 @@ def wait_for_service_running(
websocket: WebSocket,
timeout: int,
press_start_button: bool,
product_url: AnyUrl,
) -> FrameLocator:
"""NOTE: if the service was already started this will not work as some of the required websocket events will not be emitted again
In which case this will need further adjutment"""

with log_context(logging.INFO, msg="Waiting for node to run") as ctx:
waiter = SocketIONodeProgressCompleteWaiter(node_id=node_id, logger=ctx.logger)
waiter = SocketIONodeProgressCompleteWaiter(
node_id=node_id, logger=ctx.logger, product_url=product_url
)
with websocket.expect_event("framereceived", waiter, timeout=timeout):
if press_start_button:
_trigger_service_start(page, node_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import arrow
from playwright.sync_api import FrameLocator, Page, WebSocket, expect
from pydantic import ByteSize, TypeAdapter
from pydantic import AnyUrl, ByteSize, TypeAdapter # pylint: disable=no-name-in-module

from .logging_tools import log_context
from .playwright import (
Expand Down Expand Up @@ -104,6 +104,7 @@ def wait_for_launched_s4l(
*,
autoscaled: bool,
copy_workspace: bool,
product_url: AnyUrl,
) -> WaitForS4LDict:
with log_context(logging.INFO, "launch S4L") as ctx:
predicate = S4LWaitForWebsocket(logger=ctx.logger)
Expand All @@ -129,6 +130,7 @@ def wait_for_launched_s4l(
)
+ (_S4L_COPY_WORKSPACE_TIME if copy_workspace else 0),
press_start_button=False,
product_url=product_url,
)
s4l_websocket = ws_info.value
ctx.logger.info("acquired S4L websocket!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ async def teardown(self):

@property
def healthy(self) -> bool:
return self._health_check_failure_count <= self._allowed_health_check_failures
return self._rabbit_client.healthy and (
self._health_check_failure_count <= self._allowed_health_check_failures
) # https://github.com/ITISFoundation/osparc-simcore/pull/6662

@property
def health_check_failure_count(self) -> NonNegativeInt:
Expand All @@ -82,9 +84,6 @@ async def _background_task_method(self):
while self._dummy_queue.qsize() > 0:
_ = self._dummy_queue.get_nowait()
try:
if not self._rabbit_client.healthy:
self._increment_health_check_failure_count()
return
await asyncio.wait_for(
self._rabbit_client.publish(
self._dummy_message.channel_name, self._dummy_message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ async def start_scheduler() -> None:
with log_context(
_logger, level=logging.INFO, msg="starting computational scheduler"
):
app.state.scheduler = scheduler = await _scheduler_factory.create_from_db(
app
)
scheduler.recover_scheduling()
app.state.scheduler = await _scheduler_factory.create_from_db(app)

return start_scheduler

Expand Down
Loading

0 comments on commit 57058f9

Please sign in to comment.