From e63862746132c4b68d04d10cb45461cf2e6bbb09 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Mon, 7 Oct 2024 08:36:05 +0200 Subject: [PATCH 01/13] efs improvements --- ...dd_enable_efs_to_group_extra_properties.py | 32 +++++++++++++++++++ .../models/groups_extra_properties.py | 7 ++++ .../utils_groups_extra_properties.py | 1 + .../repositories/groups_extra_properties.py | 19 +++++++++++ .../docker_service_specs/sidecar.py | 18 ++++------- .../scheduler/_core/_event_create_sidecars.py | 4 +-- ...es_dynamic_sidecar_docker_service_specs.py | 15 +++++++-- 7 files changed, 80 insertions(+), 16 deletions(-) create mode 100644 packages/postgres-database/src/simcore_postgres_database/migration/versions/ea3952fe5a0e_add_enable_efs_to_group_extra_properties.py diff --git a/packages/postgres-database/src/simcore_postgres_database/migration/versions/ea3952fe5a0e_add_enable_efs_to_group_extra_properties.py b/packages/postgres-database/src/simcore_postgres_database/migration/versions/ea3952fe5a0e_add_enable_efs_to_group_extra_properties.py new file mode 100644 index 00000000000..7f66f3b3830 --- /dev/null +++ b/packages/postgres-database/src/simcore_postgres_database/migration/versions/ea3952fe5a0e_add_enable_efs_to_group_extra_properties.py @@ -0,0 +1,32 @@ +"""add `enable_efs` to group extra properties + +Revision ID: ea3952fe5a0e +Revises: 8a742f3efdd9 +Create Date: 2024-10-07 06:24:42.464942+00:00 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "ea3952fe5a0e" +down_revision = "8a742f3efdd9" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column( + "groups_extra_properties", + sa.Column( + "enable_efs", sa.Boolean(), server_default=sa.text("false"), nullable=False + ), + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("groups_extra_properties", "enable_efs") + # ### end Alembic commands ### diff --git a/packages/postgres-database/src/simcore_postgres_database/models/groups_extra_properties.py b/packages/postgres-database/src/simcore_postgres_database/models/groups_extra_properties.py index e0d438d76c9..93ffe8cd7f7 100644 --- a/packages/postgres-database/src/simcore_postgres_database/models/groups_extra_properties.py +++ b/packages/postgres-database/src/simcore_postgres_database/models/groups_extra_properties.py @@ -63,6 +63,13 @@ server_default=sa.sql.expression.false(), doc="If true, will send telemetry for new style dynamic services to frontend", ), + sa.Column( + "enable_efs", + sa.Boolean(), + nullable=False, + server_default=sa.sql.expression.false(), + doc="If true, will mount efs distributed file system when dynamic services starts", + ), sa.UniqueConstraint( "group_id", "product_name", name="group_id_product_name_uniqueness" ), diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_groups_extra_properties.py b/packages/postgres-database/src/simcore_postgres_database/utils_groups_extra_properties.py index e52fbb4791a..b6c25183a21 100644 --- a/packages/postgres-database/src/simcore_postgres_database/utils_groups_extra_properties.py +++ b/packages/postgres-database/src/simcore_postgres_database/utils_groups_extra_properties.py @@ -32,6 +32,7 @@ class GroupExtraProperties(FromRowMixin): enable_telemetry: bool created: datetime.datetime modified: datetime.datetime + enable_efs: bool async def _list_table_entries_ordered_by_group_type( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/groups_extra_properties.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/groups_extra_properties.py index 8706a899d12..c0847701634 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/groups_extra_properties.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/groups_extra_properties.py @@ -1,3 +1,4 @@ +from pydantic import BaseModel from simcore_postgres_database.utils_groups_extra_properties import ( GroupExtraProperties, GroupExtraPropertiesRepo, @@ -6,6 +7,12 @@ from ._base import BaseRepository +class UserExtraProperties(BaseModel): + is_internet_enabled: bool + is_telemetry_enabled: bool + is_efs_enabled: bool + + class GroupsExtraPropertiesRepository(BaseRepository): async def _get_aggregated_properties_for_user( self, @@ -31,3 +38,15 @@ async def is_telemetry_enabled(self, *, user_id: int, product_name: str) -> bool ) telemetry_enabled: bool = group_extra_properties.enable_telemetry return telemetry_enabled + + async def get_user_extra_properties( + self, *, user_id: int, product_name: str + ) -> UserExtraProperties: + group_extra_properties = await self._get_aggregated_properties_for_user( + user_id=user_id, product_name=product_name + ) + return UserExtraProperties( + is_internet_enabled=group_extra_properties.internet_access, + is_telemetry_enabled=group_extra_properties.enable_telemetry, + is_efs_enabled=group_extra_properties.enable_efs, + ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py index 78b66242355..a0be65a579f 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py @@ -30,6 +30,7 @@ from ....core.dynamic_services_settings.sidecar import DynamicSidecarSettings from ....core.settings import AppSettings from ....models.dynamic_services_scheduler import SchedulerData +from ....modules.db.repositories.groups_extra_properties import UserExtraProperties from .._namespace import get_compose_namespace from ..volumes import DynamicSidecarVolumesPathsResolver from ._constants import DOCKER_CONTAINER_SPEC_RESTART_POLICY_DEFAULTS @@ -220,6 +221,7 @@ async def _get_mounts( app_settings: AppSettings, has_quota_support: bool, rpc_client: RabbitMQRPCClient, + is_efs_enabled: bool, ) -> list[dict[str, Any]]: mounts: list[dict[str, Any]] = [ # docker socket needed to use the docker api @@ -270,18 +272,9 @@ async def _get_mounts( ) ) - # We check whether user has access to EFS feature - use_efs = False - efs_settings = dynamic_sidecar_settings.DYNAMIC_SIDECAR_EFS_SETTINGS - if ( - efs_settings - and scheduler_data.user_id in efs_settings.EFS_ONLY_ENABLED_FOR_USERIDS - ): - use_efs = True - # state paths now get mounted via different driver and are synced to s3 automatically for path_to_mount in scheduler_data.paths_mapping.state_paths: - if use_efs: + if is_efs_enabled: assert dynamic_sidecar_settings.DYNAMIC_SIDECAR_EFS_SETTINGS # nosec _storage_directory_name = DynamicSidecarVolumesPathsResolver.volume_name( @@ -414,7 +407,7 @@ async def get_dynamic_sidecar_spec( # pylint:disable=too-many-arguments# noqa: allow_internet_access: bool, hardware_info: HardwareInfo | None, metrics_collection_allowed: bool, - telemetry_enabled: bool, + user_extra_properties: UserExtraProperties, rpc_client: RabbitMQRPCClient, ) -> AioDockerServiceSpec: """ @@ -434,6 +427,7 @@ async def get_dynamic_sidecar_spec( # pylint:disable=too-many-arguments# noqa: app_settings=app_settings, has_quota_support=has_quota_support, rpc_client=rpc_client, + is_efs_enabled=user_extra_properties.is_efs_enabled, ) ports = _get_ports( @@ -514,7 +508,7 @@ async def get_dynamic_sidecar_spec( # pylint:disable=too-many-arguments# noqa: app_settings, allow_internet_access=allow_internet_access, metrics_collection_allowed=metrics_collection_allowed, - telemetry_enabled=telemetry_enabled, + telemetry_enabled=user_extra_properties.is_telemetry_enabled, ), "Hosts": [], "Image": dynamic_sidecar_settings.DYNAMIC_SIDECAR_IMAGE, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py index 6943f7a0852..1b021b055d8 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py @@ -218,7 +218,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: scheduler_data.run_id = RunID.create() # telemetry configuration - is_telemetry_enabled = await groups_extra_properties.is_telemetry_enabled( + user_extra_properties = await groups_extra_properties.get_user_extra_properties( user_id=scheduler_data.user_id, product_name=scheduler_data.product_name ) @@ -237,7 +237,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: has_quota_support=dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_ENABLE_VOLUME_LIMITS, allow_internet_access=allow_internet_access, metrics_collection_allowed=metrics_collection_allowed, - telemetry_enabled=is_telemetry_enabled, + user_extra_properties=user_extra_properties, rpc_client=rpc_client, ) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py index 4ebacc3424e..21527f92d14 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py @@ -38,6 +38,9 @@ ) from simcore_service_director_v2.models.dynamic_services_scheduler import SchedulerData from simcore_service_director_v2.modules.catalog import CatalogClient +from simcore_service_director_v2.modules.db.repositories.groups_extra_properties import ( + UserExtraProperties, +) from simcore_service_director_v2.modules.dynamic_sidecar.docker_service_specs import ( get_dynamic_sidecar_spec, ) @@ -453,7 +456,11 @@ async def test_get_dynamic_proxy_spec( has_quota_support=False, allow_internet_access=False, metrics_collection_allowed=True, - telemetry_enabled=True, + user_extra_properties=UserExtraProperties( + is_internet_enabled=True, + is_telemetry_enabled=False, + is_efs_enabled=False, + ), rpc_client=Mock(), ) @@ -548,7 +555,11 @@ async def test_merge_dynamic_sidecar_specs_with_user_specific_specs( has_quota_support=False, allow_internet_access=False, metrics_collection_allowed=True, - telemetry_enabled=True, + user_extra_properties=UserExtraProperties( + is_internet_enabled=True, + is_telemetry_enabled=False, + is_efs_enabled=False, + ), rpc_client=Mock(), ) assert dynamic_sidecar_spec From 1ee989510e6f5b06b10b4fbd3ce59cb81a9ba254 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Mon, 7 Oct 2024 09:44:24 +0200 Subject: [PATCH 02/13] efs improvements --- .../dynamic_sidecar/docker_service_specs/sidecar.py | 3 +-- .../scheduler/_core/_event_create_sidecars.py | 1 - ...est_modules_dynamic_sidecar_docker_service_specs.py | 10 ++++------ 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py index a0be65a579f..b788e455cf3 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py @@ -404,7 +404,6 @@ async def get_dynamic_sidecar_spec( # pylint:disable=too-many-arguments# noqa: app_settings: AppSettings, *, has_quota_support: bool, - allow_internet_access: bool, hardware_info: HardwareInfo | None, metrics_collection_allowed: bool, user_extra_properties: UserExtraProperties, @@ -506,7 +505,7 @@ async def get_dynamic_sidecar_spec( # pylint:disable=too-many-arguments# noqa: compose_namespace, scheduler_data, app_settings, - allow_internet_access=allow_internet_access, + allow_internet_access=user_extra_properties.is_internet_enabled, metrics_collection_allowed=metrics_collection_allowed, telemetry_enabled=user_extra_properties.is_telemetry_enabled, ), diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py index 1b021b055d8..4d9b552c3a6 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py @@ -235,7 +235,6 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: app_settings=app.state.settings, hardware_info=scheduler_data.hardware_info, has_quota_support=dynamic_services_scheduler_settings.DYNAMIC_SIDECAR_ENABLE_VOLUME_LIMITS, - allow_internet_access=allow_internet_access, metrics_collection_allowed=metrics_collection_allowed, user_extra_properties=user_extra_properties, rpc_client=rpc_client, diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py index 21527f92d14..a05e4cd84da 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py @@ -454,11 +454,10 @@ async def test_get_dynamic_proxy_spec( app_settings=minimal_app.state.settings, hardware_info=hardware_info, has_quota_support=False, - allow_internet_access=False, metrics_collection_allowed=True, user_extra_properties=UserExtraProperties( - is_internet_enabled=True, - is_telemetry_enabled=False, + is_internet_enabled=False, + is_telemetry_enabled=True, is_efs_enabled=False, ), rpc_client=Mock(), @@ -553,11 +552,10 @@ async def test_merge_dynamic_sidecar_specs_with_user_specific_specs( app_settings=minimal_app.state.settings, hardware_info=hardware_info, has_quota_support=False, - allow_internet_access=False, metrics_collection_allowed=True, user_extra_properties=UserExtraProperties( - is_internet_enabled=True, - is_telemetry_enabled=False, + is_internet_enabled=False, + is_telemetry_enabled=True, is_efs_enabled=False, ), rpc_client=Mock(), From 5d745059ca3c0fdaa8c5dca2dc5069b556ca60ec Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Mon, 7 Oct 2024 09:46:46 +0200 Subject: [PATCH 03/13] efs improvements --- .../scheduler/_core/_event_create_sidecars.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py index 4d9b552c3a6..0d7f2f8288f 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py @@ -181,7 +181,8 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: groups_extra_properties = get_repository(app, GroupsExtraPropertiesRepository) assert scheduler_data.product_name is not None # nosec - allow_internet_access: bool = await groups_extra_properties.has_internet_access( + + user_extra_properties = await groups_extra_properties.get_user_extra_properties( user_id=scheduler_data.user_id, product_name=scheduler_data.product_name ) @@ -194,7 +195,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: "uuid": f"{scheduler_data.node_uuid}", # needed for removal when project is closed }, "Attachable": True, - "Internal": not allow_internet_access, + "Internal": not user_extra_properties.is_internet_enabled, } dynamic_sidecar_network_id = await create_network(network_config) @@ -217,11 +218,6 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: # generate a new `run_id` to avoid resource collisions scheduler_data.run_id = RunID.create() - # telemetry configuration - user_extra_properties = await groups_extra_properties.get_user_extra_properties( - user_id=scheduler_data.user_id, product_name=scheduler_data.product_name - ) - rpc_client: RabbitMQRPCClient = app.state.rabbitmq_rpc_client # WARNING: do NOT log, this structure has secrets in the open From b3d43ae7d5c59e8697ca4c46f4c38447aa9914d7 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Mon, 7 Oct 2024 11:32:18 +0200 Subject: [PATCH 04/13] modify Dockerfile --- services/efs-guardian/Dockerfile | 8 +++----- services/efs-guardian/docker/entrypoint.sh | 5 +++-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/services/efs-guardian/Dockerfile b/services/efs-guardian/Dockerfile index 8d75f1ac056..5d470ee4af4 100644 --- a/services/efs-guardian/Dockerfile +++ b/services/efs-guardian/Dockerfile @@ -168,14 +168,12 @@ ENV SC_BUILD_TARGET=production \ ENV PYTHONOPTIMIZE=TRUE WORKDIR /home/efs -# ensure home folder is read/writable for user efs -RUN chown -R efs /home/efs # Starting from clean base image, copies pre-installed virtualenv from prod-only-deps -COPY --chown=efs:efs --from=prod-only-deps ${VIRTUAL_ENV} ${VIRTUAL_ENV} +COPY --from=prod-only-deps ${VIRTUAL_ENV} ${VIRTUAL_ENV} # Copies booting scripts -COPY --chown=efs:efs services/efs-guardian/docker services/efs-guardian/docker +COPY services/efs-guardian/docker services/efs-guardian/docker RUN chmod +x services/efs-guardian/docker/*.sh @@ -205,7 +203,7 @@ ENV SC_BUILD_TARGET=development \ WORKDIR /devel -RUN chown -R efs:efs "${VIRTUAL_ENV}" +RUN chown -R root:root "${VIRTUAL_ENV}" ENTRYPOINT ["/bin/sh", "services/efs-guardian/docker/entrypoint.sh"] CMD ["/bin/sh", "services/efs-guardian/docker/boot.sh"] diff --git a/services/efs-guardian/docker/entrypoint.sh b/services/efs-guardian/docker/entrypoint.sh index 5e58e8e87c8..136ed41e99b 100755 --- a/services/efs-guardian/docker/entrypoint.sh +++ b/services/efs-guardian/docker/entrypoint.sh @@ -88,7 +88,8 @@ if stat $DOCKER_MOUNT >/dev/null 2>&1; then fi echo "$INFO Starting $* ..." -echo " $EFS_USER_NAME rights : $(id "$EFS_USER_NAME")" +echo " $(whoami) rights : $(id $whoami))" echo " local dir : $(ls -al)" -exec gosu "$EFS_USER_NAME" "$@" +# exec gosu "$EFS_USER_NAME" "$@" +exec "$@" From ee25042314931808b42da6117a8ddadeafe94bf3 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Mon, 7 Oct 2024 11:49:48 +0200 Subject: [PATCH 05/13] modify Dockerfile --- services/efs-guardian/docker/entrypoint.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/services/efs-guardian/docker/entrypoint.sh b/services/efs-guardian/docker/entrypoint.sh index 136ed41e99b..d8ddf1c826a 100755 --- a/services/efs-guardian/docker/entrypoint.sh +++ b/services/efs-guardian/docker/entrypoint.sh @@ -91,5 +91,4 @@ echo "$INFO Starting $* ..." echo " $(whoami) rights : $(id $whoami))" echo " local dir : $(ls -al)" -# exec gosu "$EFS_USER_NAME" "$@" exec "$@" From f301fbb105b29e164fd0fe7201f0c1f60b287633 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 8 Oct 2024 18:11:43 +0200 Subject: [PATCH 06/13] daily work --- .../src/models_library/rabbitmq_messages.py | 17 +++++ .../src/servicelib/rabbitmq/_client.py | 3 + .../src/servicelib/rabbitmq/_utils.py | 3 +- services/docker-compose.yml | 5 ++ .../core/rabbitmq.py | 7 ++ .../modules/resource_tracking/_core.py | 21 +++++- .../core/application.py | 6 ++ .../core/settings.py | 2 + .../services/background_tasks.py | 18 +++++ .../services/background_tasks_setup.py | 73 +++++++++++++++++++ .../services/efs_manager.py | 27 +++++++ .../services/efs_manager_utils.py | 54 ++++++++++++++ .../services/modules/redis.py | 29 ++++++++ .../services/process_messages.py | 20 +++++ .../services/process_messages_setup.py | 63 ++++++++++++++++ 15 files changed, 344 insertions(+), 4 deletions(-) create mode 100644 services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py create mode 100644 services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py create mode 100644 services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py create mode 100644 services/efs-guardian/src/simcore_service_efs_guardian/services/modules/redis.py create mode 100644 services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py create mode 100644 services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py diff --git a/packages/models-library/src/models_library/rabbitmq_messages.py b/packages/models-library/src/models_library/rabbitmq_messages.py index 355389584d3..b5c18008dc8 100644 --- a/packages/models-library/src/models_library/rabbitmq_messages.py +++ b/packages/models-library/src/models_library/rabbitmq_messages.py @@ -194,6 +194,23 @@ def routing_key(self) -> str | None: return None +class DynamicServiceRunningMessage(RabbitMessageBase): + channel_name: Literal["io.simcore.service.dynamic-service-running"] = Field( + default="io.simcore.service.dynamic-service-running", const=True + ) + + project_id: ProjectID + node_id: NodeID + product_name: ProductName | None + created_at: datetime.datetime = Field( + default_factory=lambda: arrow.utcnow().datetime, + description="message creation datetime", + ) + + def routing_key(self) -> str | None: + return None + + class RabbitResourceTrackingStartedMessage(RabbitResourceTrackingBaseMessage): message_type: RabbitResourceTrackingMessageType = Field( default=RabbitResourceTrackingMessageType.TRACKING_STARTED, const=True diff --git a/packages/service-library/src/servicelib/rabbitmq/_client.py b/packages/service-library/src/servicelib/rabbitmq/_client.py index dc70aa03ffa..0882ad189dd 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_client.py +++ b/packages/service-library/src/servicelib/rabbitmq/_client.py @@ -151,6 +151,7 @@ async def subscribe( message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS, unexpected_error_retry_delay_s: float = _DEFAULT_UNEXPECTED_ERROR_RETRY_DELAY_S, unexpected_error_max_attempts: int = _DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS, + queue_name: str | None = None, ) -> str: """subscribe to exchange_name calling ``message_handler`` for every incoming message - exclusive_queue: True means that every instance of this application will @@ -216,6 +217,7 @@ async def subscribe( exclusive_queue=exclusive_queue, message_ttl=message_ttl, arguments={"x-dead-letter-exchange": delayed_exchange_name}, + queue_name=queue_name, ) if topics is None: await queue.bind(exchange, routing_key="") @@ -235,6 +237,7 @@ async def subscribe( exclusive_queue=exclusive_queue, message_ttl=int(unexpected_error_retry_delay_s * 1000), arguments={"x-dead-letter-exchange": exchange.name}, + queue_name=queue_name, ) await delayed_queue.bind(delayed_exchange) diff --git a/packages/service-library/src/servicelib/rabbitmq/_utils.py b/packages/service-library/src/servicelib/rabbitmq/_utils.py index 176635e1e88..6e2ece046f7 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_utils.py +++ b/packages/service-library/src/servicelib/rabbitmq/_utils.py @@ -70,6 +70,7 @@ async def declare_queue( exclusive_queue: bool, arguments: dict[str, Any] | None = None, message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS, + queue_name: str | None = None, ) -> aio_pika.abc.AbstractRobustQueue: default_arguments = {"x-message-ttl": message_ttl} if arguments is not None: @@ -82,7 +83,7 @@ async def declare_queue( } if not exclusive_queue: # NOTE: setting a name will ensure multiple instance will take their data here - queue_parameters |= {"name": exchange_name} + queue_parameters |= {"name": queue_name if queue_name else exchange_name} # NOTE: if below line raises something similar to ``ChannelPreconditionFailed: PRECONDITION_FAILED`` # most likely someone changed the signature of the queues (parameters etc...) diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 6ef30b59a39..5de8e0451c9 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -407,6 +407,11 @@ services: RABBIT_PORT: ${RABBIT_PORT} RABBIT_SECURE: ${RABBIT_SECURE} RABBIT_USER: ${RABBIT_USER} + REDIS_HOST: ${REDIS_HOST} + REDIS_PASSWORD: ${REDIS_PASSWORD} + REDIS_PORT: ${REDIS_PORT} + REDIS_SECURE: ${REDIS_SECURE} + REDIS_USER: ${REDIS_USER} SC_USER_ID: ${SC_USER_ID} SC_USER_NAME: ${SC_USER_NAME} EFS_USER_ID: ${EFS_USER_ID} diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py index b0daf7f881c..0ce567648d9 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py @@ -5,6 +5,7 @@ from fastapi import FastAPI from models_library.progress_bar import ProgressReport from models_library.rabbitmq_messages import ( + DynamicServiceRunningMessage, EventRabbitMessage, LoggerRabbitMessage, ProgressRabbitMessageNode, @@ -34,6 +35,12 @@ async def post_resource_tracking_message( await _post_rabbit_message(app, message) +async def post_dynamic_service_running_message( + app: FastAPI, message: DynamicServiceRunningMessage +): + await _post_rabbit_message(app, message) + + async def post_log_message( app: FastAPI, log: LogMessageStr, *, log_level: LogLevelInt ) -> None: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py index 031b42ff324..085a24e6544 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py @@ -1,9 +1,11 @@ +import asyncio import logging from typing import Final from fastapi import FastAPI from models_library.generated_models.docker_rest_api import ContainerState from models_library.rabbitmq_messages import ( + DynamicServiceRunningMessage, RabbitResourceTrackingHeartbeatMessage, RabbitResourceTrackingStartedMessage, RabbitResourceTrackingStoppedMessage, @@ -19,7 +21,10 @@ are_all_containers_in_expected_states, get_container_states, ) -from ...core.rabbitmq import post_resource_tracking_message +from ...core.rabbitmq import ( + post_dynamic_service_running_message, + post_resource_tracking_message, +) from ...core.settings import ApplicationSettings, ResourceTrackingSettings from ...models.shared_store import SharedStore from ._models import ResourceTrackingState @@ -70,10 +75,20 @@ async def _heart_beat_task(app: FastAPI): ) if are_all_containers_in_expected_states(container_states.values()): - message = RabbitResourceTrackingHeartbeatMessage( + rut_message = RabbitResourceTrackingHeartbeatMessage( service_run_id=settings.DY_SIDECAR_RUN_ID ) - await post_resource_tracking_message(app, message) + dyn_message = DynamicServiceRunningMessage( + project_id=settings.DY_SIDECAR_PROJECT_ID, + node_id=settings.DY_SIDECAR_NODE_ID, + product_name=settings.DY_SIDECAR_PRODUCT_NAME, + ) + await asyncio.gather( + *[ + post_resource_tracking_message(app, rut_message), + post_dynamic_service_running_message(app, dyn_message), + ] + ) else: _logger.info( "heart beat message skipped: container_states=%s", container_states diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/core/application.py b/services/efs-guardian/src/simcore_service_efs_guardian/core/application.py index 4e3527f7fc3..6bf2833ed02 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/core/application.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/core/application.py @@ -13,8 +13,11 @@ ) from ..api.rest.routes import setup_api_routes from ..api.rpc.routes import setup_rpc_routes +from ..services.background_tasks_setup import setup as setup_background_tasks from ..services.efs_manager_setup import setup as setup_efs_manager from ..services.modules.rabbitmq import setup as setup_rabbitmq +from ..services.modules.redis import setup as setup_redis +from ..services.process_messages_setup import setup as setup_process_messages from .settings import ApplicationSettings logger = logging.getLogger(__name__) @@ -40,11 +43,14 @@ def create_app(settings: ApplicationSettings) -> FastAPI: # PLUGINS SETUP setup_rabbitmq(app) + setup_redis(app) setup_api_routes(app) setup_rpc_routes(app) setup_efs_manager(app) + setup_background_tasks(app) + setup_process_messages(app) # EVENTS async def _on_startup() -> None: diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py b/services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py index 8c71f90aefc..ae6ff01c532 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py @@ -12,6 +12,7 @@ from settings_library.base import BaseCustomSettings from settings_library.efs import AwsEfsSettings from settings_library.rabbit import RabbitSettings +from settings_library.redis import RedisSettings from settings_library.tracing import TracingSettings from settings_library.utils_logging import MixinLoggingSettings @@ -76,6 +77,7 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): EFS_GUARDIAN_AWS_EFS_SETTINGS: AwsEfsSettings = Field(auto_default_from_env=True) EFS_GUARDIAN_RABBITMQ: RabbitSettings = Field(auto_default_from_env=True) + EFS_GUARDIAN_REDIS: RedisSettings = Field(auto_default_from_env=True) EFS_GUARDIAN_TRACING: TracingSettings | None = Field( auto_default_from_env=True, description="settings for opentelemetry tracing" ) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py new file mode 100644 index 00000000000..4d74e60e7c3 --- /dev/null +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py @@ -0,0 +1,18 @@ +import logging + +from fastapi import FastAPI + +from ..core.settings import ApplicationSettings + +_logger = logging.getLogger(__name__) + + +async def removal_policy_task(app: FastAPI) -> None: + _logger.info("Removal policy task started") + + # After X days of inactivity remove data from EFS + # Probably use `last_modified_data` in the project DB table + # Maybe lock project during this time lock_project() + + app_settings: ApplicationSettings = app.state.settings + assert app_settings # nosec diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py new file mode 100644 index 00000000000..e390d1b1cc4 --- /dev/null +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py @@ -0,0 +1,73 @@ +import asyncio +import logging +from collections.abc import Awaitable, Callable +from datetime import timedelta +from typing import TypedDict + +from fastapi import FastAPI +from servicelib.background_task import stop_periodic_task +from servicelib.logging_utils import log_catch, log_context +from servicelib.redis_utils import start_exclusive_periodic_task + +from .background_tasks import removal_policy_task +from .modules.redis import get_redis_client + +_logger = logging.getLogger(__name__) + + +class EfsGuardianBackgroundTask(TypedDict): + name: str + task_func: Callable + + +_EFS_GUARDIAN_BACKGROUND_TASKS = [ + EfsGuardianBackgroundTask( + name="efs_removal_policy_task", task_func=removal_policy_task + ) +] + + +def on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]: + async def _startup() -> None: + with log_context( + _logger, logging.INFO, msg="Efs Guardian startup.." + ), log_catch(_logger, reraise=False): + app.state.efs_guardian_background_tasks = [] + + # Setup periodic tasks + for task in _EFS_GUARDIAN_BACKGROUND_TASKS: + exclusive_task = start_exclusive_periodic_task( + get_redis_client(app), + task["task_func"], + task_period=timedelta(seconds=60), # 1 minute + retry_after=timedelta(seconds=60), # 5 minutes + task_name=task["name"], + app=app, + ) + app.state.efs_guardian_background_tasks.append(exclusive_task) + + return _startup + + +def on_app_shutdown( + _app: FastAPI, +) -> Callable[[], Awaitable[None]]: + async def _stop() -> None: + with log_context( + _logger, logging.INFO, msg="Efs Guardian shutdown.." + ), log_catch(_logger, reraise=False): + assert _app # nosec + if _app.state.efs_guardian_background_tasks: + await asyncio.gather( + *[ + stop_periodic_task(task) + for task in _app.state.efs_guardian_background_tasks + ] + ) + + return _stop + + +def setup(app: FastAPI) -> None: + app.add_event_handler("startup", on_app_startup(app)) + app.add_event_handler("shutdown", on_app_shutdown(app)) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py index 401f38ed1b5..0661cadd3d8 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py @@ -5,8 +5,10 @@ from fastapi import FastAPI from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID +from pydantic import ByteSize from ..core.settings import ApplicationSettings, get_application_settings +from . import efs_manager_utils @dataclass(frozen=True) @@ -52,3 +54,28 @@ async def create_project_specific_data_dir( _dir_path, 0o770 ) # This gives rwx permissions to user and group, and nothing to others return _dir_path + + async def get_project_node_data_size( + self, project_id: ProjectID, node_id: NodeID + ) -> ByteSize: + _dir_path = ( + self._efs_mounted_path + / self._project_specific_data_base_directory + / f"{project_id}" + / f"{node_id}" + ) + + service_size = await efs_manager_utils.get_size_bash_async(_dir_path) + return service_size + + async def remove_project_node_data_write_permissions( + self, project_id: ProjectID, node_id: NodeID + ) -> None: + _dir_path = ( + self._efs_mounted_path + / self._project_specific_data_base_directory + / f"{project_id}" + / f"{node_id}" + ) + + await efs_manager_utils.remove_write_permissions_bash_async(_dir_path) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py new file mode 100644 index 00000000000..948ac2a0b21 --- /dev/null +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py @@ -0,0 +1,54 @@ +import asyncio + +from pydantic import ByteSize + + +async def get_size_bash_async(path) -> ByteSize: + try: + # Create the subprocess + process = await asyncio.create_subprocess_exec( + "du", + "-sb", + path, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + # Wait for the subprocess to complete + stdout, stderr = await process.communicate() + + if process.returncode == 0: + # Parse the output + size = ByteSize(stdout.decode().split()[0]) + return size + else: + print(f"Error: {stderr.decode()}") + raise ValueError + except Exception as e: + raise e + + +async def remove_write_permissions_bash_async(path) -> ByteSize: + try: + # Create the subprocess + process = await asyncio.create_subprocess_exec( + "chmod", + "-R", + "a-w", + path, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + # Wait for the subprocess to complete + stdout, stderr = await process.communicate() + + if process.returncode == 0: + # Parse the output + size = ByteSize(stdout.decode().split()[0]) + return size + else: + print(f"Error: {stderr.decode()}") + raise ValueError + except Exception as e: + raise e diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/redis.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/redis.py new file mode 100644 index 00000000000..89aa9484de7 --- /dev/null +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/redis.py @@ -0,0 +1,29 @@ +import logging +from typing import cast + +from fastapi import FastAPI +from servicelib.redis import RedisClientSDK +from settings_library.redis import RedisDatabase, RedisSettings + +logger = logging.getLogger(__name__) + + +def setup(app: FastAPI) -> None: + async def on_startup() -> None: + app.state.redis_client_sdk = None + settings: RedisSettings = app.state.settings.EFS_GUARDIAN_REDIS + redis_locks_dsn = settings.build_redis_dsn(RedisDatabase.LOCKS) + app.state.redis_client_sdk = client = RedisClientSDK(redis_locks_dsn) + await client.setup() + + async def on_shutdown() -> None: + redis_client_sdk: None | RedisClientSDK = app.state.redis_client_sdk + if redis_client_sdk: + await redis_client_sdk.shutdown() + + app.add_event_handler("startup", on_startup) + app.add_event_handler("shutdown", on_shutdown) + + +def get_redis_client(app: FastAPI) -> RedisClientSDK: + return cast(RedisClientSDK, app.state.redis_client_sdk) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py new file mode 100644 index 00000000000..258ef0e5c5b --- /dev/null +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py @@ -0,0 +1,20 @@ +import logging + +from fastapi import FastAPI +from models_library.rabbitmq_messages import DynamicServiceRunningMessage +from pydantic import parse_raw_as + +_logger = logging.getLogger(__name__) + + +async def process_dynamic_service_running_message(app: FastAPI, data: bytes) -> bool: + assert app # nosec + rabbit_message: DynamicServiceRunningMessage = parse_raw_as( + DynamicServiceRunningMessage, data # type: ignore[arg-type] + ) + _logger.info( + "Process %s msg service_run_id: %s", + rabbit_message.project_id, + rabbit_message.node_id, + ) + return True diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py new file mode 100644 index 00000000000..ae12c2a7f01 --- /dev/null +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py @@ -0,0 +1,63 @@ +import functools +import logging +from collections.abc import Awaitable, Callable + +from fastapi import FastAPI +from models_library.rabbitmq_messages import DynamicServiceRunningMessage +from servicelib.logging_utils import log_catch, log_context +from servicelib.rabbitmq import RabbitMQClient +from settings_library.rabbit import RabbitSettings + +from ..core.settings import ApplicationSettings +from .modules.rabbitmq import get_rabbitmq_client +from .process_messages import process_dynamic_service_running_message + +_logger = logging.getLogger(__name__) + + +_RUT_MESSAGE_TTL_IN_MS = 2 * 60 * 60 * 1000 # 2 hours + + +async def _subscribe_to_rabbitmq(app) -> str: + with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channel"): + rabbit_client: RabbitMQClient = get_rabbitmq_client(app) + subscribed_queue: str = await rabbit_client.subscribe( + DynamicServiceRunningMessage.get_channel_name(), + message_handler=functools.partial( + process_dynamic_service_running_message, app + ), + exclusive_queue=False, + message_ttl=_RUT_MESSAGE_TTL_IN_MS, + queue_name="efs-dynamic-service-running-queue", + ) + return subscribed_queue + + +def on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]: + async def _startup() -> None: + with log_context( + _logger, logging.INFO, msg="setup resource tracker" + ), log_catch(_logger, reraise=False): + app_settings: ApplicationSettings = app.state.settings + app.state.efs_guardian_rabbitmq_consumer = None + settings: RabbitSettings | None = app_settings.EFS_GUARDIAN_RABBITMQ + if not settings: + _logger.warning("RabbitMQ client is de-activated in the settings") + return + app.state.efs_guardian_rabbitmq_consumer = await _subscribe_to_rabbitmq(app) + + return _startup + + +def on_app_shutdown( + _app: FastAPI, +) -> Callable[[], Awaitable[None]]: + async def _stop() -> None: + assert _app # nosec + + return _stop + + +def setup(app: FastAPI) -> None: + app.add_event_handler("startup", on_app_startup(app)) + app.add_event_handler("shutdown", on_app_shutdown(app)) From d386338e13c48baa01b84a0cfeb7bbf1a4269e68 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Wed, 9 Oct 2024 15:06:20 +0200 Subject: [PATCH 07/13] adding tests --- .../src/models_library/rabbitmq_messages.py | 1 + .../modules/resource_tracking/_core.py | 1 + .../core/settings.py | 5 +- .../services/background_tasks_setup.py | 4 +- .../services/efs_manager_utils.py | 6 +- .../services/modules/redis.py | 16 +-- .../services/process_messages.py | 32 ++++- services/efs-guardian/tests/unit/conftest.py | 9 ++ .../tests/unit/test_api_health.py | 6 +- .../tests/unit/test_efs_guardian_rpc.py | 70 ++++++++++ .../tests/unit/test_efs_manager.py | 122 +++++++++++++++--- 11 files changed, 234 insertions(+), 38 deletions(-) create mode 100644 services/efs-guardian/tests/unit/test_efs_guardian_rpc.py diff --git a/packages/models-library/src/models_library/rabbitmq_messages.py b/packages/models-library/src/models_library/rabbitmq_messages.py index b5c18008dc8..69812689baa 100644 --- a/packages/models-library/src/models_library/rabbitmq_messages.py +++ b/packages/models-library/src/models_library/rabbitmq_messages.py @@ -201,6 +201,7 @@ class DynamicServiceRunningMessage(RabbitMessageBase): project_id: ProjectID node_id: NodeID + user_id: UserID product_name: ProductName | None created_at: datetime.datetime = Field( default_factory=lambda: arrow.utcnow().datetime, diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py index 085a24e6544..6da3aa3f00c 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py @@ -81,6 +81,7 @@ async def _heart_beat_task(app: FastAPI): dyn_message = DynamicServiceRunningMessage( project_id=settings.DY_SIDECAR_PROJECT_ID, node_id=settings.DY_SIDECAR_NODE_ID, + user_id=settings.DY_SIDECAR_USER_ID, product_name=settings.DY_SIDECAR_PRODUCT_NAME, ) await asyncio.gather( diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py b/services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py index ae6ff01c532..35897a71eb3 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py @@ -8,7 +8,7 @@ LogLevel, VersionTag, ) -from pydantic import Field, PositiveInt, validator +from pydantic import ByteSize, Field, PositiveInt, validator from settings_library.base import BaseCustomSettings from settings_library.efs import AwsEfsSettings from settings_library.rabbit import RabbitSettings @@ -58,6 +58,9 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): EFS_GROUP_NAME: str = Field( description="Linux group name that the EFS and Simcore linux users are part of" ) + EFS_DEFAULT_USER_SERVICE_SIZE_BYTES: ByteSize = Field( + default=536870912000 # 500GiB = 534GB + ) # RUNTIME ----------------------------------------------------------- EFS_GUARDIAN_DEBUG: bool = Field( diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py index e390d1b1cc4..458b76615ac 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py @@ -10,7 +10,7 @@ from servicelib.redis_utils import start_exclusive_periodic_task from .background_tasks import removal_policy_task -from .modules.redis import get_redis_client +from .modules.redis import get_redis_lock_client _logger = logging.getLogger(__name__) @@ -37,7 +37,7 @@ async def _startup() -> None: # Setup periodic tasks for task in _EFS_GUARDIAN_BACKGROUND_TASKS: exclusive_task = start_exclusive_periodic_task( - get_redis_client(app), + get_redis_lock_client(app), task["task_func"], task_period=timedelta(seconds=60), # 1 minute retry_after=timedelta(seconds=60), # 5 minutes diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py index 948ac2a0b21..2affeabacc5 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py @@ -28,7 +28,7 @@ async def get_size_bash_async(path) -> ByteSize: raise e -async def remove_write_permissions_bash_async(path) -> ByteSize: +async def remove_write_permissions_bash_async(path) -> None: try: # Create the subprocess process = await asyncio.create_subprocess_exec( @@ -44,9 +44,7 @@ async def remove_write_permissions_bash_async(path) -> ByteSize: stdout, stderr = await process.communicate() if process.returncode == 0: - # Parse the output - size = ByteSize(stdout.decode().split()[0]) - return size + return else: print(f"Error: {stderr.decode()}") raise ValueError diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/redis.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/redis.py index 89aa9484de7..20cbcc0a4db 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/redis.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/redis.py @@ -10,20 +10,20 @@ def setup(app: FastAPI) -> None: async def on_startup() -> None: - app.state.redis_client_sdk = None + app.state.redis_lock_client_sdk = None settings: RedisSettings = app.state.settings.EFS_GUARDIAN_REDIS redis_locks_dsn = settings.build_redis_dsn(RedisDatabase.LOCKS) - app.state.redis_client_sdk = client = RedisClientSDK(redis_locks_dsn) - await client.setup() + app.state.redis_lock_client_sdk = lock_client = RedisClientSDK(redis_locks_dsn) + await lock_client.setup() async def on_shutdown() -> None: - redis_client_sdk: None | RedisClientSDK = app.state.redis_client_sdk - if redis_client_sdk: - await redis_client_sdk.shutdown() + redis_lock_client_sdk: None | RedisClientSDK = app.state.redis_lock_client_sdk + if redis_lock_client_sdk: + await redis_lock_client_sdk.shutdown() app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) -def get_redis_client(app: FastAPI) -> RedisClientSDK: - return cast(RedisClientSDK, app.state.redis_client_sdk) +def get_redis_lock_client(app: FastAPI) -> RedisClientSDK: + return cast(RedisClientSDK, app.state.redis_lock_client_sdk) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py index 258ef0e5c5b..ab6aa1a2e1e 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py @@ -3,6 +3,10 @@ from fastapi import FastAPI from models_library.rabbitmq_messages import DynamicServiceRunningMessage from pydantic import parse_raw_as +from simcore_service_efs_guardian.services.modules.redis import get_redis_lock_client + +from ..core.settings import get_application_settings +from ..services.efs_manager import EfsManager _logger = logging.getLogger(__name__) @@ -12,9 +16,33 @@ async def process_dynamic_service_running_message(app: FastAPI, data: bytes) -> rabbit_message: DynamicServiceRunningMessage = parse_raw_as( DynamicServiceRunningMessage, data # type: ignore[arg-type] ) - _logger.info( - "Process %s msg service_run_id: %s", + _logger.debug( + "Process dynamic service running msg, project ID: %s node ID: %s", rabbit_message.project_id, rabbit_message.node_id, ) + + settings = get_application_settings(app) + efs_manager: EfsManager = app.state.efs_manager + size = await efs_manager.get_project_node_data_size( + rabbit_message.project_id, node_id=rabbit_message.node_id + ) + + if size > settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES: + _logger.warning( + "Removing write permissions inside of EFS starts for project ID: %s, node ID: %s, current user: %s", + rabbit_message.project_id, + rabbit_message.node_id, + rabbit_message.user_id, + ) + redis = get_redis_lock_client(app) + async with redis.lock_context( + f"efs_remove_write_permissions-{rabbit_message.project_id=}-{rabbit_message.node_id=}", + blocking=True, + blocking_timeout_s=10, + ): + await efs_manager.remove_project_node_data_write_permissions( + project_id=rabbit_message.project_id, node_id=rabbit_message.node_id + ) + return True diff --git a/services/efs-guardian/tests/unit/conftest.py b/services/efs-guardian/tests/unit/conftest.py index 62e4352e1cc..bb1bc1473f2 100644 --- a/services/efs-guardian/tests/unit/conftest.py +++ b/services/efs-guardian/tests/unit/conftest.py @@ -12,8 +12,10 @@ import simcore_service_efs_guardian import yaml from asgi_lifespan import LifespanManager +from fakeredis.aioredis import FakeRedis from fastapi import FastAPI from httpx import ASGITransport +from pytest_mock import MockerFixture from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict from servicelib.rabbitmq import RabbitMQRPCClient from settings_library.rabbit import RabbitSettings @@ -29,6 +31,7 @@ "pytest_simcore.pydantic_models", "pytest_simcore.pytest_global_environs", "pytest_simcore.rabbit_service", + "pytest_simcore.redis_service", "pytest_simcore.repository_paths", "pytest_simcore.aws_s3_service", "pytest_simcore.aws_server", @@ -139,3 +142,9 @@ async def rpc_client( rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]], ) -> RabbitMQRPCClient: return await rabbitmq_rpc_client("client") + + +@pytest.fixture +async def mocked_redis_server(mocker: MockerFixture) -> None: + mock_redis = FakeRedis() + mocker.patch("redis.asyncio.from_url", return_value=mock_redis) diff --git a/services/efs-guardian/tests/unit/test_api_health.py b/services/efs-guardian/tests/unit/test_api_health.py index 621543e2d80..5b801de21bb 100644 --- a/services/efs-guardian/tests/unit/test_api_health.py +++ b/services/efs-guardian/tests/unit/test_api_health.py @@ -28,7 +28,11 @@ def app_environment( ) -async def test_healthcheck(rabbit_service: RabbitSettings, client: httpx.AsyncClient): +async def test_healthcheck( + rabbit_service: RabbitSettings, + mocked_redis_server, + client: httpx.AsyncClient, +): response = await client.get("/") response.raise_for_status() assert response.status_code == status.HTTP_200_OK diff --git a/services/efs-guardian/tests/unit/test_efs_guardian_rpc.py b/services/efs-guardian/tests/unit/test_efs_guardian_rpc.py new file mode 100644 index 00000000000..9ebb2b3489d --- /dev/null +++ b/services/efs-guardian/tests/unit/test_efs_guardian_rpc.py @@ -0,0 +1,70 @@ +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-arguments +# pylint: disable=unused-argument +# pylint: disable=unused-variable + +from pathlib import Path +from unittest.mock import patch + +import pytest +from faker import Faker +from fastapi import FastAPI +from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict +from pytest_simcore.helpers.typing_env import EnvVarsDict +from servicelib.rabbitmq import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.efs_guardian import efs_manager +from simcore_service_efs_guardian.core.settings import AwsEfsSettings + +pytest_simcore_core_services_selection = ["rabbit"] +pytest_simcore_ops_services_selection = [] + + +@pytest.fixture +def app_environment( + monkeypatch: pytest.MonkeyPatch, + app_environment: EnvVarsDict, + rabbit_env_vars_dict: EnvVarsDict, # rabbitMQ settings from 'rabbit' service +) -> EnvVarsDict: + return setenvs_from_dict( + monkeypatch, + { + **app_environment, + **rabbit_env_vars_dict, + }, + ) + + +async def test_rpc_create_project_specific_data_dir( + rpc_client: RabbitMQRPCClient, + faker: Faker, + mocked_redis_server: None, + app: FastAPI, +): + aws_efs_settings: AwsEfsSettings = app.state.settings.EFS_GUARDIAN_AWS_EFS_SETTINGS + + _project_id = faker.uuid4() + _node_id = faker.uuid4() + _storage_directory_name = faker.name() + + with patch( + "simcore_service_efs_guardian.services.efs_manager.os.chown" + ) as mocked_chown: + result = await efs_manager.create_project_specific_data_dir( + rpc_client, + project_id=_project_id, + node_id=_node_id, + storage_directory_name=_storage_directory_name, + ) + mocked_chown.assert_called_once() + + assert isinstance(result, Path) + _expected_path = ( + aws_efs_settings.EFS_MOUNTED_PATH + / aws_efs_settings.EFS_PROJECT_SPECIFIC_DATA_DIRECTORY + / _project_id + / _node_id + / _storage_directory_name + ) + assert _expected_path == result + assert _expected_path.exists diff --git a/services/efs-guardian/tests/unit/test_efs_manager.py b/services/efs-guardian/tests/unit/test_efs_manager.py index 42e22c9386d..32692469c4c 100644 --- a/services/efs-guardian/tests/unit/test_efs_manager.py +++ b/services/efs-guardian/tests/unit/test_efs_manager.py @@ -4,6 +4,9 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable +import os +import shutil +import stat from pathlib import Path from unittest.mock import patch @@ -12,9 +15,12 @@ from fastapi import FastAPI from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict -from servicelib.rabbitmq import RabbitMQRPCClient -from servicelib.rabbitmq.rpc_interfaces.efs_guardian import efs_manager from simcore_service_efs_guardian.core.settings import AwsEfsSettings +from simcore_service_efs_guardian.services.efs_manager import ( + EfsManager, + NodeID, + ProjectID, +) pytest_simcore_core_services_selection = ["rabbit"] pytest_simcore_ops_services_selection = [] @@ -24,7 +30,7 @@ def app_environment( monkeypatch: pytest.MonkeyPatch, app_environment: EnvVarsDict, - rabbit_env_vars_dict: EnvVarsDict, # rabbitMQ settings from 'rabbit' service + rabbit_env_vars_dict: EnvVarsDict, ) -> EnvVarsDict: return setenvs_from_dict( monkeypatch, @@ -35,35 +41,111 @@ def app_environment( ) -async def test_rpc_create_project_specific_data_dir( - rpc_client: RabbitMQRPCClient, +@pytest.fixture +async def cleanup(app: FastAPI): + + yield + + aws_efs_settings: AwsEfsSettings = app.state.settings.EFS_GUARDIAN_AWS_EFS_SETTINGS + _dir_path = Path(aws_efs_settings.EFS_MOUNTED_PATH) + if Path.exists(_dir_path): + for root, dirs, files in os.walk(_dir_path): + for name in dirs + files: + file_path = os.path.join(root, name) + # Get the current permissions of the file or directory + current_permissions = os.stat(file_path).st_mode + # Add write permission for the owner (user) + os.chmod(file_path, current_permissions | stat.S_IWUSR) + + shutil.rmtree(_dir_path) + + +def assert_permissions( + file_path: Path, + expected_readable: bool, + expected_writable: bool, + expected_executable: bool, +): + file_stat = os.stat(file_path) + file_permissions = file_stat.st_mode + is_readable = bool(file_permissions & stat.S_IRUSR) + is_writable = bool(file_permissions & stat.S_IWUSR) + is_executable = bool(file_permissions & stat.S_IXUSR) + + assert ( + is_readable == expected_readable + ), f"Expected readable={expected_readable}, but got readable={is_readable} for {file_path}" + assert ( + is_writable == expected_writable + ), f"Expected writable={expected_writable}, but got writable={is_writable} for {file_path}" + assert ( + is_executable == expected_executable + ), f"Expected executable={expected_executable}, but got executable={is_executable} for {file_path}" + + +async def test_remove_write_access_rights( faker: Faker, + mocked_redis_server: None, app: FastAPI, + cleanup: None, ): aws_efs_settings: AwsEfsSettings = app.state.settings.EFS_GUARDIAN_AWS_EFS_SETTINGS - _project_id = faker.uuid4() - _node_id = faker.uuid4() - _storage_directory_name = faker.name() + _project_id = ProjectID(faker.uuid4()) + _node_id = NodeID(faker.uuid4()) + _storage_directory_name = faker.word() + _dir_path = ( + aws_efs_settings.EFS_MOUNTED_PATH + / aws_efs_settings.EFS_PROJECT_SPECIFIC_DATA_DIRECTORY + / f"{_project_id}" + / f"{_node_id}" + / f"{_storage_directory_name}" + ) + + efs_manager: EfsManager = app.state.efs_manager with patch( "simcore_service_efs_guardian.services.efs_manager.os.chown" ) as mocked_chown: - result = await efs_manager.create_project_specific_data_dir( - rpc_client, + await efs_manager.create_project_specific_data_dir( project_id=_project_id, node_id=_node_id, storage_directory_name=_storage_directory_name, ) - mocked_chown.assert_called_once() - assert isinstance(result, Path) - _expected_path = ( - aws_efs_settings.EFS_MOUNTED_PATH - / aws_efs_settings.EFS_PROJECT_SPECIFIC_DATA_DIRECTORY - / _project_id - / _node_id - / _storage_directory_name + size_before = await efs_manager.get_project_node_data_size( + project_id=_project_id, node_id=_node_id + ) + + file_paths = [] + for i in range(3): # Let's create 3 small files for testing + file_path = Path(_dir_path, f"test_file_{i}.txt") + with open(file_path, "w") as f: + f.write(f"This is file {i}") + file_paths.append(file_path) + + size_after = await efs_manager.get_project_node_data_size( + project_id=_project_id, node_id=_node_id ) - assert _expected_path == result - assert _expected_path.exists + assert size_after > size_before + + # Now we will check removal of write permissions + for file_path in file_paths: + assert_permissions( + file_path, + expected_readable=True, + expected_writable=True, + expected_executable=False, + ) + + await efs_manager.remove_project_node_data_write_permissions( + project_id=_project_id, node_id=_node_id + ) + + for file_path in file_paths: + assert_permissions( + file_path, + expected_readable=True, + expected_writable=False, + expected_executable=False, + ) From ca3035cd7bf255e0533d880be6c60c6cd765e255 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Wed, 9 Oct 2024 15:20:49 +0200 Subject: [PATCH 08/13] remove queue name --- packages/service-library/src/servicelib/rabbitmq/_client.py | 3 --- packages/service-library/src/servicelib/rabbitmq/_utils.py | 3 +-- .../services/process_messages_setup.py | 1 - 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/packages/service-library/src/servicelib/rabbitmq/_client.py b/packages/service-library/src/servicelib/rabbitmq/_client.py index 0882ad189dd..dc70aa03ffa 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_client.py +++ b/packages/service-library/src/servicelib/rabbitmq/_client.py @@ -151,7 +151,6 @@ async def subscribe( message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS, unexpected_error_retry_delay_s: float = _DEFAULT_UNEXPECTED_ERROR_RETRY_DELAY_S, unexpected_error_max_attempts: int = _DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS, - queue_name: str | None = None, ) -> str: """subscribe to exchange_name calling ``message_handler`` for every incoming message - exclusive_queue: True means that every instance of this application will @@ -217,7 +216,6 @@ async def subscribe( exclusive_queue=exclusive_queue, message_ttl=message_ttl, arguments={"x-dead-letter-exchange": delayed_exchange_name}, - queue_name=queue_name, ) if topics is None: await queue.bind(exchange, routing_key="") @@ -237,7 +235,6 @@ async def subscribe( exclusive_queue=exclusive_queue, message_ttl=int(unexpected_error_retry_delay_s * 1000), arguments={"x-dead-letter-exchange": exchange.name}, - queue_name=queue_name, ) await delayed_queue.bind(delayed_exchange) diff --git a/packages/service-library/src/servicelib/rabbitmq/_utils.py b/packages/service-library/src/servicelib/rabbitmq/_utils.py index 6e2ece046f7..176635e1e88 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_utils.py +++ b/packages/service-library/src/servicelib/rabbitmq/_utils.py @@ -70,7 +70,6 @@ async def declare_queue( exclusive_queue: bool, arguments: dict[str, Any] | None = None, message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS, - queue_name: str | None = None, ) -> aio_pika.abc.AbstractRobustQueue: default_arguments = {"x-message-ttl": message_ttl} if arguments is not None: @@ -83,7 +82,7 @@ async def declare_queue( } if not exclusive_queue: # NOTE: setting a name will ensure multiple instance will take their data here - queue_parameters |= {"name": queue_name if queue_name else exchange_name} + queue_parameters |= {"name": exchange_name} # NOTE: if below line raises something similar to ``ChannelPreconditionFailed: PRECONDITION_FAILED`` # most likely someone changed the signature of the queues (parameters etc...) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py index ae12c2a7f01..3c174586fd3 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py @@ -28,7 +28,6 @@ async def _subscribe_to_rabbitmq(app) -> str: ), exclusive_queue=False, message_ttl=_RUT_MESSAGE_TTL_IN_MS, - queue_name="efs-dynamic-service-running-queue", ) return subscribed_queue From 6642e80c0e127cfcde4bf467cf29406b9722c8ad Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Wed, 9 Oct 2024 16:09:58 +0200 Subject: [PATCH 09/13] improvements --- .../services/background_tasks_setup.py | 2 +- .../services/efs_manager.py | 12 +++ .../services/efs_manager_utils.py | 82 +++++++++---------- .../services/process_messages.py | 27 +++++- .../tests/unit/test_efs_manager.py | 14 ++++ 5 files changed, 91 insertions(+), 46 deletions(-) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py index 458b76615ac..58e019ef7f9 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py @@ -40,7 +40,7 @@ async def _startup() -> None: get_redis_lock_client(app), task["task_func"], task_period=timedelta(seconds=60), # 1 minute - retry_after=timedelta(seconds=60), # 5 minutes + retry_after=timedelta(seconds=300), # 5 minutes task_name=task["name"], app=app, ) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py index 0661cadd3d8..42cb0839564 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py @@ -55,6 +55,18 @@ async def create_project_specific_data_dir( ) # This gives rwx permissions to user and group, and nothing to others return _dir_path + async def check_project_node_data_directory_exits( + self, project_id: ProjectID, node_id: NodeID + ) -> bool: + _dir_path = ( + self._efs_mounted_path + / self._project_specific_data_base_directory + / f"{project_id}" + / f"{node_id}" + ) + + return _dir_path.exists() + async def get_project_node_data_size( self, project_id: ProjectID, node_id: NodeID ) -> ByteSize: diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py index 2affeabacc5..a58c146879f 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py @@ -1,52 +1,48 @@ import asyncio +import logging from pydantic import ByteSize +_logger = logging.getLogger(__name__) + async def get_size_bash_async(path) -> ByteSize: - try: - # Create the subprocess - process = await asyncio.create_subprocess_exec( - "du", - "-sb", - path, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - # Wait for the subprocess to complete - stdout, stderr = await process.communicate() - - if process.returncode == 0: - # Parse the output - size = ByteSize(stdout.decode().split()[0]) - return size - else: - print(f"Error: {stderr.decode()}") - raise ValueError - except Exception as e: - raise e + # Create the subprocess + command = ["du", "-sb", path] + process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + # Wait for the subprocess to complete + stdout, stderr = await process.communicate() + + if process.returncode == 0: + # Parse the output + size = ByteSize(stdout.decode().split()[0]) + return size + else: + msg = f"Command {' '.join(command)} failed with error code {process.returncode}: {stderr.decode()}" + _logger.error(msg) + raise RuntimeError(msg) async def remove_write_permissions_bash_async(path) -> None: - try: - # Create the subprocess - process = await asyncio.create_subprocess_exec( - "chmod", - "-R", - "a-w", - path, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - # Wait for the subprocess to complete - stdout, stderr = await process.communicate() - - if process.returncode == 0: - return - else: - print(f"Error: {stderr.decode()}") - raise ValueError - except Exception as e: - raise e + # Create the subprocess + command = ["chmod", "-R", "a-w", path] + process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + # Wait for the subprocess to complete + _, stderr = await process.communicate() + + if process.returncode == 0: + return + else: + msg = f"Command {' '.join(command)} failed with error code {process.returncode}: {stderr.decode()}" + _logger.error(msg) + raise RuntimeError(msg) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py index ab6aa1a2e1e..7629d36c220 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py @@ -17,23 +17,46 @@ async def process_dynamic_service_running_message(app: FastAPI, data: bytes) -> DynamicServiceRunningMessage, data # type: ignore[arg-type] ) _logger.debug( - "Process dynamic service running msg, project ID: %s node ID: %s", + "Process dynamic service running msg, project ID: %s node ID: %s, current user: %s", rabbit_message.project_id, rabbit_message.node_id, + rabbit_message.user_id, ) settings = get_application_settings(app) efs_manager: EfsManager = app.state.efs_manager + + dir_exists = await efs_manager.check_project_node_data_directory_exits( + rabbit_message.project_id, node_id=rabbit_message.node_id + ) + if dir_exists is False: + _logger.debug( + "Directory doesn't exists in EFS, project ID: %s node ID: %s, current user: %s", + rabbit_message.project_id, + rabbit_message.node_id, + rabbit_message.user_id, + ) + return True + size = await efs_manager.get_project_node_data_size( rabbit_message.project_id, node_id=rabbit_message.node_id ) + _logger.debug( + "Current directory size: %s, project ID: %s node ID: %s, current user: %s", + size, + rabbit_message.project_id, + rabbit_message.node_id, + rabbit_message.user_id, + ) if size > settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES: _logger.warning( - "Removing write permissions inside of EFS starts for project ID: %s, node ID: %s, current user: %s", + "Removing write permissions inside of EFS starts for project ID: %s, node ID: %s, current user: %s, size: %s, upper limit: %s", rabbit_message.project_id, rabbit_message.node_id, rabbit_message.user_id, + size, + settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES, ) redis = get_redis_lock_client(app) async with redis.lock_context( diff --git a/services/efs-guardian/tests/unit/test_efs_manager.py b/services/efs-guardian/tests/unit/test_efs_manager.py index 32692469c4c..e041cb79b44 100644 --- a/services/efs-guardian/tests/unit/test_efs_manager.py +++ b/services/efs-guardian/tests/unit/test_efs_manager.py @@ -104,6 +104,13 @@ async def test_remove_write_access_rights( efs_manager: EfsManager = app.state.efs_manager + assert ( + await efs_manager.check_project_node_data_directory_exits( + project_id=_project_id, node_id=_node_id + ) + is False + ) + with patch( "simcore_service_efs_guardian.services.efs_manager.os.chown" ) as mocked_chown: @@ -113,6 +120,13 @@ async def test_remove_write_access_rights( storage_directory_name=_storage_directory_name, ) + assert ( + await efs_manager.check_project_node_data_directory_exits( + project_id=_project_id, node_id=_node_id + ) + is True + ) + size_before = await efs_manager.get_project_node_data_size( project_id=_project_id, node_id=_node_id ) From 6980cb232611b9d9d707118aa32d952de317fc08 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Wed, 9 Oct 2024 17:43:49 +0200 Subject: [PATCH 10/13] improvements --- .../services/efs_manager_utils.py | 7 +++---- services/efs-guardian/tests/unit/test_efs_guardian_rpc.py | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py index a58c146879f..5e787b031c8 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py @@ -42,7 +42,6 @@ async def remove_write_permissions_bash_async(path) -> None: if process.returncode == 0: return - else: - msg = f"Command {' '.join(command)} failed with error code {process.returncode}: {stderr.decode()}" - _logger.error(msg) - raise RuntimeError(msg) + msg = f"Command {' '.join(command)} failed with error code {process.returncode}: {stderr.decode()}" + _logger.error(msg) + raise RuntimeError(msg) diff --git a/services/efs-guardian/tests/unit/test_efs_guardian_rpc.py b/services/efs-guardian/tests/unit/test_efs_guardian_rpc.py index 9ebb2b3489d..9e2d6711c2a 100644 --- a/services/efs-guardian/tests/unit/test_efs_guardian_rpc.py +++ b/services/efs-guardian/tests/unit/test_efs_guardian_rpc.py @@ -36,9 +36,9 @@ def app_environment( async def test_rpc_create_project_specific_data_dir( + mocked_redis_server: None, rpc_client: RabbitMQRPCClient, faker: Faker, - mocked_redis_server: None, app: FastAPI, ): aws_efs_settings: AwsEfsSettings = app.state.settings.EFS_GUARDIAN_AWS_EFS_SETTINGS From a6c55de0ae28bebccd01a6115cdd84fceaea2dbf Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Thu, 10 Oct 2024 09:48:04 +0200 Subject: [PATCH 11/13] improvements --- .../services/efs_manager_utils.py | 7 +++---- .../services/process_messages.py | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py index 5e787b031c8..9418fa733db 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py @@ -22,10 +22,9 @@ async def get_size_bash_async(path) -> ByteSize: # Parse the output size = ByteSize(stdout.decode().split()[0]) return size - else: - msg = f"Command {' '.join(command)} failed with error code {process.returncode}: {stderr.decode()}" - _logger.error(msg) - raise RuntimeError(msg) + msg = f"Command {' '.join(command)} failed with error code {process.returncode}: {stderr.decode()}" + _logger.error(msg) + raise RuntimeError(msg) async def remove_write_permissions_bash_async(path) -> None: diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py index 7629d36c220..0f1e87fc13c 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py @@ -14,7 +14,7 @@ async def process_dynamic_service_running_message(app: FastAPI, data: bytes) -> bool: assert app # nosec rabbit_message: DynamicServiceRunningMessage = parse_raw_as( - DynamicServiceRunningMessage, data # type: ignore[arg-type] + DynamicServiceRunningMessage, data ) _logger.debug( "Process dynamic service running msg, project ID: %s node ID: %s, current user: %s", From 94952b925439c8af794c31a4d9ea8d8d8f319e45 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Thu, 10 Oct 2024 09:49:47 +0200 Subject: [PATCH 12/13] review @pcrespov --- .../src/simcore_service_efs_guardian/core/settings.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py b/services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py index 35897a71eb3..7b630993830 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py @@ -8,7 +8,7 @@ LogLevel, VersionTag, ) -from pydantic import ByteSize, Field, PositiveInt, validator +from pydantic import ByteSize, Field, PositiveInt, parse_obj_as, validator from settings_library.base import BaseCustomSettings from settings_library.efs import AwsEfsSettings from settings_library.rabbit import RabbitSettings @@ -59,7 +59,7 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): description="Linux group name that the EFS and Simcore linux users are part of" ) EFS_DEFAULT_USER_SERVICE_SIZE_BYTES: ByteSize = Field( - default=536870912000 # 500GiB = 534GB + default=parse_obj_as(ByteSize, "500GiB") ) # RUNTIME ----------------------------------------------------------- From bd5eb5ed945a30feb7bbe54bf4aa49d31e01a3b7 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Thu, 10 Oct 2024 11:12:01 +0200 Subject: [PATCH 13/13] review @pcrespov --- .../services/background_tasks.py | 2 +- .../services/background_tasks_setup.py | 8 +-- .../services/efs_manager.py | 3 +- .../services/process_messages.py | 29 +++++------ .../services/process_messages_setup.py | 16 +++--- services/efs-guardian/tests/unit/conftest.py | 24 +++++++++ .../tests/unit/test_api_health.py | 2 +- .../tests/unit/test_efs_guardian_rpc.py | 17 ++++--- .../tests/unit/test_efs_manager.py | 49 ++++++------------- 9 files changed, 77 insertions(+), 73 deletions(-) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py index 4d74e60e7c3..8edce477cc7 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py @@ -8,7 +8,7 @@ async def removal_policy_task(app: FastAPI) -> None: - _logger.info("Removal policy task started") + _logger.info("FAKE Removal policy task started (not yet implemented)") # After X days of inactivity remove data from EFS # Probably use `last_modified_data` in the project DB table diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py index 58e019ef7f9..0946b82177b 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py @@ -27,7 +27,7 @@ class EfsGuardianBackgroundTask(TypedDict): ] -def on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]: +def _on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]: async def _startup() -> None: with log_context( _logger, logging.INFO, msg="Efs Guardian startup.." @@ -49,7 +49,7 @@ async def _startup() -> None: return _startup -def on_app_shutdown( +def _on_app_shutdown( _app: FastAPI, ) -> Callable[[], Awaitable[None]]: async def _stop() -> None: @@ -69,5 +69,5 @@ async def _stop() -> None: def setup(app: FastAPI) -> None: - app.add_event_handler("startup", on_app_startup(app)) - app.add_event_handler("shutdown", on_app_shutdown(app)) + app.add_event_handler("startup", _on_app_startup(app)) + app.add_event_handler("shutdown", _on_app_shutdown(app)) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py index 42cb0839564..be0460b7e64 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py @@ -77,8 +77,7 @@ async def get_project_node_data_size( / f"{node_id}" ) - service_size = await efs_manager_utils.get_size_bash_async(_dir_path) - return service_size + return await efs_manager_utils.get_size_bash_async(_dir_path) async def remove_project_node_data_write_permissions( self, project_id: ProjectID, node_id: NodeID diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py index 0f1e87fc13c..11c7781bbae 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py @@ -3,6 +3,7 @@ from fastapi import FastAPI from models_library.rabbitmq_messages import DynamicServiceRunningMessage from pydantic import parse_raw_as +from servicelib.logging_utils import log_context from simcore_service_efs_guardian.services.modules.redis import get_redis_lock_client from ..core.settings import get_application_settings @@ -50,22 +51,16 @@ async def process_dynamic_service_running_message(app: FastAPI, data: bytes) -> ) if size > settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES: - _logger.warning( - "Removing write permissions inside of EFS starts for project ID: %s, node ID: %s, current user: %s, size: %s, upper limit: %s", - rabbit_message.project_id, - rabbit_message.node_id, - rabbit_message.user_id, - size, - settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES, - ) - redis = get_redis_lock_client(app) - async with redis.lock_context( - f"efs_remove_write_permissions-{rabbit_message.project_id=}-{rabbit_message.node_id=}", - blocking=True, - blocking_timeout_s=10, - ): - await efs_manager.remove_project_node_data_write_permissions( - project_id=rabbit_message.project_id, node_id=rabbit_message.node_id - ) + msg = f"Removing write permissions inside of EFS starts for project ID: {rabbit_message.project_id}, node ID: {rabbit_message.node_id}, current user: {rabbit_message.user_id}, size: {size}, upper limit: {settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES}" + with log_context(_logger, logging.WARNING, msg=msg): + redis = get_redis_lock_client(app) + async with redis.lock_context( + f"efs_remove_write_permissions-{rabbit_message.project_id=}-{rabbit_message.node_id=}", + blocking=True, + blocking_timeout_s=10, + ): + await efs_manager.remove_project_node_data_write_permissions( + project_id=rabbit_message.project_id, node_id=rabbit_message.node_id + ) return True diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py index 3c174586fd3..d879d189157 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py @@ -15,7 +15,11 @@ _logger = logging.getLogger(__name__) -_RUT_MESSAGE_TTL_IN_MS = 2 * 60 * 60 * 1000 # 2 hours +_SEC = 1000 # in ms +_MIN = 60 * _SEC # in ms +_HOUR = 60 * _MIN # in ms + +_EFS_MESSAGE_TTL_IN_MS = 2 * _HOUR async def _subscribe_to_rabbitmq(app) -> str: @@ -27,12 +31,12 @@ async def _subscribe_to_rabbitmq(app) -> str: process_dynamic_service_running_message, app ), exclusive_queue=False, - message_ttl=_RUT_MESSAGE_TTL_IN_MS, + message_ttl=_EFS_MESSAGE_TTL_IN_MS, ) return subscribed_queue -def on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]: +def _on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]: async def _startup() -> None: with log_context( _logger, logging.INFO, msg="setup resource tracker" @@ -48,7 +52,7 @@ async def _startup() -> None: return _startup -def on_app_shutdown( +def _on_app_shutdown( _app: FastAPI, ) -> Callable[[], Awaitable[None]]: async def _stop() -> None: @@ -58,5 +62,5 @@ async def _stop() -> None: def setup(app: FastAPI) -> None: - app.add_event_handler("startup", on_app_startup(app)) - app.add_event_handler("shutdown", on_app_shutdown(app)) + app.add_event_handler("startup", _on_app_startup(app)) + app.add_event_handler("shutdown", _on_app_shutdown(app)) diff --git a/services/efs-guardian/tests/unit/conftest.py b/services/efs-guardian/tests/unit/conftest.py index bb1bc1473f2..da4196ea859 100644 --- a/services/efs-guardian/tests/unit/conftest.py +++ b/services/efs-guardian/tests/unit/conftest.py @@ -2,7 +2,10 @@ # pylint:disable=unused-argument # pylint:disable=redefined-outer-name +import os import re +import shutil +import stat from collections.abc import AsyncIterator, Callable from pathlib import Path from typing import Awaitable @@ -18,6 +21,7 @@ from pytest_mock import MockerFixture from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict from servicelib.rabbitmq import RabbitMQRPCClient +from settings_library.efs import AwsEfsSettings from settings_library.rabbit import RabbitSettings from simcore_service_efs_guardian.core.application import create_app from simcore_service_efs_guardian.core.settings import ApplicationSettings @@ -28,6 +32,7 @@ "pytest_simcore.docker_registry", "pytest_simcore.docker_swarm", "pytest_simcore.environment_configs", + "pytest_simcore.faker_projects_data", "pytest_simcore.pydantic_models", "pytest_simcore.pytest_global_environs", "pytest_simcore.rabbit_service", @@ -148,3 +153,22 @@ async def rpc_client( async def mocked_redis_server(mocker: MockerFixture) -> None: mock_redis = FakeRedis() mocker.patch("redis.asyncio.from_url", return_value=mock_redis) + + +@pytest.fixture +async def cleanup(app: FastAPI): + + yield + + aws_efs_settings: AwsEfsSettings = app.state.settings.EFS_GUARDIAN_AWS_EFS_SETTINGS + _dir_path = Path(aws_efs_settings.EFS_MOUNTED_PATH) + if _dir_path.exists(): + for root, dirs, files in os.walk(_dir_path): + for name in dirs + files: + file_path = Path(root, name) + # Get the current permissions of the file or directory + current_permissions = Path.stat(file_path).st_mode + # Add write permission for the owner (user) + Path.chmod(file_path, current_permissions | stat.S_IWUSR) + + shutil.rmtree(_dir_path) diff --git a/services/efs-guardian/tests/unit/test_api_health.py b/services/efs-guardian/tests/unit/test_api_health.py index 5b801de21bb..8b42d559e7f 100644 --- a/services/efs-guardian/tests/unit/test_api_health.py +++ b/services/efs-guardian/tests/unit/test_api_health.py @@ -30,7 +30,7 @@ def app_environment( async def test_healthcheck( rabbit_service: RabbitSettings, - mocked_redis_server, + mocked_redis_server: None, client: httpx.AsyncClient, ): response = await client.get("/") diff --git a/services/efs-guardian/tests/unit/test_efs_guardian_rpc.py b/services/efs-guardian/tests/unit/test_efs_guardian_rpc.py index 9e2d6711c2a..48474a69d45 100644 --- a/services/efs-guardian/tests/unit/test_efs_guardian_rpc.py +++ b/services/efs-guardian/tests/unit/test_efs_guardian_rpc.py @@ -10,6 +10,8 @@ import pytest from faker import Faker from fastapi import FastAPI +from models_library.projects import ProjectID +from models_library.projects_nodes import NodeID from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict from servicelib.rabbitmq import RabbitMQRPCClient @@ -40,20 +42,21 @@ async def test_rpc_create_project_specific_data_dir( rpc_client: RabbitMQRPCClient, faker: Faker, app: FastAPI, + project_id: ProjectID, + node_id: NodeID, + cleanup: None, ): aws_efs_settings: AwsEfsSettings = app.state.settings.EFS_GUARDIAN_AWS_EFS_SETTINGS - _project_id = faker.uuid4() - _node_id = faker.uuid4() - _storage_directory_name = faker.name() + _storage_directory_name = faker.word() with patch( "simcore_service_efs_guardian.services.efs_manager.os.chown" ) as mocked_chown: result = await efs_manager.create_project_specific_data_dir( rpc_client, - project_id=_project_id, - node_id=_node_id, + project_id=project_id, + node_id=node_id, storage_directory_name=_storage_directory_name, ) mocked_chown.assert_called_once() @@ -62,8 +65,8 @@ async def test_rpc_create_project_specific_data_dir( _expected_path = ( aws_efs_settings.EFS_MOUNTED_PATH / aws_efs_settings.EFS_PROJECT_SPECIFIC_DATA_DIRECTORY - / _project_id - / _node_id + / f"{project_id}" + / f"{node_id}" / _storage_directory_name ) assert _expected_path == result diff --git a/services/efs-guardian/tests/unit/test_efs_manager.py b/services/efs-guardian/tests/unit/test_efs_manager.py index e041cb79b44..5c5c57cf3ab 100644 --- a/services/efs-guardian/tests/unit/test_efs_manager.py +++ b/services/efs-guardian/tests/unit/test_efs_manager.py @@ -4,8 +4,6 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable -import os -import shutil import stat from pathlib import Path from unittest.mock import patch @@ -41,32 +39,13 @@ def app_environment( ) -@pytest.fixture -async def cleanup(app: FastAPI): - - yield - - aws_efs_settings: AwsEfsSettings = app.state.settings.EFS_GUARDIAN_AWS_EFS_SETTINGS - _dir_path = Path(aws_efs_settings.EFS_MOUNTED_PATH) - if Path.exists(_dir_path): - for root, dirs, files in os.walk(_dir_path): - for name in dirs + files: - file_path = os.path.join(root, name) - # Get the current permissions of the file or directory - current_permissions = os.stat(file_path).st_mode - # Add write permission for the owner (user) - os.chmod(file_path, current_permissions | stat.S_IWUSR) - - shutil.rmtree(_dir_path) - - def assert_permissions( file_path: Path, expected_readable: bool, expected_writable: bool, expected_executable: bool, ): - file_stat = os.stat(file_path) + file_stat = Path.stat(file_path) file_permissions = file_stat.st_mode is_readable = bool(file_permissions & stat.S_IRUSR) is_writable = bool(file_permissions & stat.S_IWUSR) @@ -88,17 +67,17 @@ async def test_remove_write_access_rights( mocked_redis_server: None, app: FastAPI, cleanup: None, + project_id: ProjectID, + node_id: NodeID, ): aws_efs_settings: AwsEfsSettings = app.state.settings.EFS_GUARDIAN_AWS_EFS_SETTINGS - _project_id = ProjectID(faker.uuid4()) - _node_id = NodeID(faker.uuid4()) _storage_directory_name = faker.word() _dir_path = ( aws_efs_settings.EFS_MOUNTED_PATH / aws_efs_settings.EFS_PROJECT_SPECIFIC_DATA_DIRECTORY - / f"{_project_id}" - / f"{_node_id}" + / f"{project_id}" + / f"{node_id}" / f"{_storage_directory_name}" ) @@ -106,7 +85,7 @@ async def test_remove_write_access_rights( assert ( await efs_manager.check_project_node_data_directory_exits( - project_id=_project_id, node_id=_node_id + project_id=project_id, node_id=node_id ) is False ) @@ -115,31 +94,31 @@ async def test_remove_write_access_rights( "simcore_service_efs_guardian.services.efs_manager.os.chown" ) as mocked_chown: await efs_manager.create_project_specific_data_dir( - project_id=_project_id, - node_id=_node_id, + project_id=project_id, + node_id=node_id, storage_directory_name=_storage_directory_name, ) + assert mocked_chown.called assert ( await efs_manager.check_project_node_data_directory_exits( - project_id=_project_id, node_id=_node_id + project_id=project_id, node_id=node_id ) is True ) size_before = await efs_manager.get_project_node_data_size( - project_id=_project_id, node_id=_node_id + project_id=project_id, node_id=node_id ) file_paths = [] for i in range(3): # Let's create 3 small files for testing file_path = Path(_dir_path, f"test_file_{i}.txt") - with open(file_path, "w") as f: - f.write(f"This is file {i}") + file_path.write_text(f"This is file {i}") file_paths.append(file_path) size_after = await efs_manager.get_project_node_data_size( - project_id=_project_id, node_id=_node_id + project_id=project_id, node_id=node_id ) assert size_after > size_before @@ -153,7 +132,7 @@ async def test_remove_write_access_rights( ) await efs_manager.remove_project_node_data_write_permissions( - project_id=_project_id, node_id=_node_id + project_id=project_id, node_id=node_id ) for file_path in file_paths: