From 4d27c320fcd9712fadc501f47d0da41d58b77d9b Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 <60785969+matusdrobuliak66@users.noreply.github.com> Date: Thu, 10 Oct 2024 15:40:11 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=A8=20EFS=20Guardian:=20adding=20size?= =?UTF-8?q?=20monitoring=20(#6502)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/models_library/rabbitmq_messages.py | 18 +++ services/docker-compose.yml | 5 + .../core/rabbitmq.py | 7 ++ .../modules/resource_tracking/_core.py | 22 +++- .../core/application.py | 6 + .../core/settings.py | 7 +- .../services/background_tasks.py | 18 +++ .../services/background_tasks_setup.py | 73 +++++++++++ .../services/efs_manager.py | 38 ++++++ .../services/efs_manager_utils.py | 46 +++++++ .../services/modules/redis.py | 29 +++++ .../services/process_messages.py | 66 ++++++++++ .../services/process_messages_setup.py | 66 ++++++++++ services/efs-guardian/tests/unit/conftest.py | 33 +++++ .../tests/unit/test_api_health.py | 6 +- .../tests/unit/test_efs_guardian_rpc.py | 73 +++++++++++ .../tests/unit/test_efs_manager.py | 119 ++++++++++++++---- 17 files changed, 605 insertions(+), 27 deletions(-) create mode 100644 services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py create mode 100644 services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py create mode 100644 services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py create mode 100644 services/efs-guardian/src/simcore_service_efs_guardian/services/modules/redis.py create mode 100644 services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py create mode 100644 services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py create mode 100644 services/efs-guardian/tests/unit/test_efs_guardian_rpc.py diff --git a/packages/models-library/src/models_library/rabbitmq_messages.py b/packages/models-library/src/models_library/rabbitmq_messages.py index 355389584d3..69812689baa 100644 --- a/packages/models-library/src/models_library/rabbitmq_messages.py +++ b/packages/models-library/src/models_library/rabbitmq_messages.py @@ -194,6 +194,24 @@ def routing_key(self) -> str | None: return None +class DynamicServiceRunningMessage(RabbitMessageBase): + channel_name: Literal["io.simcore.service.dynamic-service-running"] = Field( + default="io.simcore.service.dynamic-service-running", const=True + ) + + project_id: ProjectID + node_id: NodeID + user_id: UserID + product_name: ProductName | None + created_at: datetime.datetime = Field( + default_factory=lambda: arrow.utcnow().datetime, + description="message creation datetime", + ) + + def routing_key(self) -> str | None: + return None + + class RabbitResourceTrackingStartedMessage(RabbitResourceTrackingBaseMessage): message_type: RabbitResourceTrackingMessageType = Field( default=RabbitResourceTrackingMessageType.TRACKING_STARTED, const=True diff --git a/services/docker-compose.yml b/services/docker-compose.yml index ba8137e0e5a..c5f8e762ee7 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -408,6 +408,11 @@ services: RABBIT_PORT: ${RABBIT_PORT} RABBIT_SECURE: ${RABBIT_SECURE} RABBIT_USER: ${RABBIT_USER} + REDIS_HOST: ${REDIS_HOST} + REDIS_PASSWORD: ${REDIS_PASSWORD} + REDIS_PORT: ${REDIS_PORT} + REDIS_SECURE: ${REDIS_SECURE} + REDIS_USER: ${REDIS_USER} SC_USER_ID: ${SC_USER_ID} SC_USER_NAME: ${SC_USER_NAME} EFS_USER_ID: ${EFS_USER_ID} diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py index b0daf7f881c..0ce567648d9 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py @@ -5,6 +5,7 @@ from fastapi import FastAPI from models_library.progress_bar import ProgressReport from models_library.rabbitmq_messages import ( + DynamicServiceRunningMessage, EventRabbitMessage, LoggerRabbitMessage, ProgressRabbitMessageNode, @@ -34,6 +35,12 @@ async def post_resource_tracking_message( await _post_rabbit_message(app, message) +async def post_dynamic_service_running_message( + app: FastAPI, message: DynamicServiceRunningMessage +): + await _post_rabbit_message(app, message) + + async def post_log_message( app: FastAPI, log: LogMessageStr, *, log_level: LogLevelInt ) -> None: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py index 031b42ff324..6da3aa3f00c 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py @@ -1,9 +1,11 @@ +import asyncio import logging from typing import Final from fastapi import FastAPI from models_library.generated_models.docker_rest_api import ContainerState from models_library.rabbitmq_messages import ( + DynamicServiceRunningMessage, RabbitResourceTrackingHeartbeatMessage, RabbitResourceTrackingStartedMessage, RabbitResourceTrackingStoppedMessage, @@ -19,7 +21,10 @@ are_all_containers_in_expected_states, get_container_states, ) -from ...core.rabbitmq import post_resource_tracking_message +from ...core.rabbitmq import ( + post_dynamic_service_running_message, + post_resource_tracking_message, +) from ...core.settings import ApplicationSettings, ResourceTrackingSettings from ...models.shared_store import SharedStore from ._models import ResourceTrackingState @@ -70,10 +75,21 @@ async def _heart_beat_task(app: FastAPI): ) if are_all_containers_in_expected_states(container_states.values()): - message = RabbitResourceTrackingHeartbeatMessage( + rut_message = RabbitResourceTrackingHeartbeatMessage( service_run_id=settings.DY_SIDECAR_RUN_ID ) - await post_resource_tracking_message(app, message) + dyn_message = DynamicServiceRunningMessage( + project_id=settings.DY_SIDECAR_PROJECT_ID, + node_id=settings.DY_SIDECAR_NODE_ID, + user_id=settings.DY_SIDECAR_USER_ID, + product_name=settings.DY_SIDECAR_PRODUCT_NAME, + ) + await asyncio.gather( + *[ + post_resource_tracking_message(app, rut_message), + post_dynamic_service_running_message(app, dyn_message), + ] + ) else: _logger.info( "heart beat message skipped: container_states=%s", container_states diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/core/application.py b/services/efs-guardian/src/simcore_service_efs_guardian/core/application.py index 4e3527f7fc3..6bf2833ed02 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/core/application.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/core/application.py @@ -13,8 +13,11 @@ ) from ..api.rest.routes import setup_api_routes from ..api.rpc.routes import setup_rpc_routes +from ..services.background_tasks_setup import setup as setup_background_tasks from ..services.efs_manager_setup import setup as setup_efs_manager from ..services.modules.rabbitmq import setup as setup_rabbitmq +from ..services.modules.redis import setup as setup_redis +from ..services.process_messages_setup import setup as setup_process_messages from .settings import ApplicationSettings logger = logging.getLogger(__name__) @@ -40,11 +43,14 @@ def create_app(settings: ApplicationSettings) -> FastAPI: # PLUGINS SETUP setup_rabbitmq(app) + setup_redis(app) setup_api_routes(app) setup_rpc_routes(app) setup_efs_manager(app) + setup_background_tasks(app) + setup_process_messages(app) # EVENTS async def _on_startup() -> None: diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py b/services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py index 8c71f90aefc..7b630993830 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/core/settings.py @@ -8,10 +8,11 @@ LogLevel, VersionTag, ) -from pydantic import Field, PositiveInt, validator +from pydantic import ByteSize, Field, PositiveInt, parse_obj_as, validator from settings_library.base import BaseCustomSettings from settings_library.efs import AwsEfsSettings from settings_library.rabbit import RabbitSettings +from settings_library.redis import RedisSettings from settings_library.tracing import TracingSettings from settings_library.utils_logging import MixinLoggingSettings @@ -57,6 +58,9 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): EFS_GROUP_NAME: str = Field( description="Linux group name that the EFS and Simcore linux users are part of" ) + EFS_DEFAULT_USER_SERVICE_SIZE_BYTES: ByteSize = Field( + default=parse_obj_as(ByteSize, "500GiB") + ) # RUNTIME ----------------------------------------------------------- EFS_GUARDIAN_DEBUG: bool = Field( @@ -76,6 +80,7 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): EFS_GUARDIAN_AWS_EFS_SETTINGS: AwsEfsSettings = Field(auto_default_from_env=True) EFS_GUARDIAN_RABBITMQ: RabbitSettings = Field(auto_default_from_env=True) + EFS_GUARDIAN_REDIS: RedisSettings = Field(auto_default_from_env=True) EFS_GUARDIAN_TRACING: TracingSettings | None = Field( auto_default_from_env=True, description="settings for opentelemetry tracing" ) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py new file mode 100644 index 00000000000..8edce477cc7 --- /dev/null +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks.py @@ -0,0 +1,18 @@ +import logging + +from fastapi import FastAPI + +from ..core.settings import ApplicationSettings + +_logger = logging.getLogger(__name__) + + +async def removal_policy_task(app: FastAPI) -> None: + _logger.info("FAKE Removal policy task started (not yet implemented)") + + # After X days of inactivity remove data from EFS + # Probably use `last_modified_data` in the project DB table + # Maybe lock project during this time lock_project() + + app_settings: ApplicationSettings = app.state.settings + assert app_settings # nosec diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py new file mode 100644 index 00000000000..0946b82177b --- /dev/null +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/background_tasks_setup.py @@ -0,0 +1,73 @@ +import asyncio +import logging +from collections.abc import Awaitable, Callable +from datetime import timedelta +from typing import TypedDict + +from fastapi import FastAPI +from servicelib.background_task import stop_periodic_task +from servicelib.logging_utils import log_catch, log_context +from servicelib.redis_utils import start_exclusive_periodic_task + +from .background_tasks import removal_policy_task +from .modules.redis import get_redis_lock_client + +_logger = logging.getLogger(__name__) + + +class EfsGuardianBackgroundTask(TypedDict): + name: str + task_func: Callable + + +_EFS_GUARDIAN_BACKGROUND_TASKS = [ + EfsGuardianBackgroundTask( + name="efs_removal_policy_task", task_func=removal_policy_task + ) +] + + +def _on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]: + async def _startup() -> None: + with log_context( + _logger, logging.INFO, msg="Efs Guardian startup.." + ), log_catch(_logger, reraise=False): + app.state.efs_guardian_background_tasks = [] + + # Setup periodic tasks + for task in _EFS_GUARDIAN_BACKGROUND_TASKS: + exclusive_task = start_exclusive_periodic_task( + get_redis_lock_client(app), + task["task_func"], + task_period=timedelta(seconds=60), # 1 minute + retry_after=timedelta(seconds=300), # 5 minutes + task_name=task["name"], + app=app, + ) + app.state.efs_guardian_background_tasks.append(exclusive_task) + + return _startup + + +def _on_app_shutdown( + _app: FastAPI, +) -> Callable[[], Awaitable[None]]: + async def _stop() -> None: + with log_context( + _logger, logging.INFO, msg="Efs Guardian shutdown.." + ), log_catch(_logger, reraise=False): + assert _app # nosec + if _app.state.efs_guardian_background_tasks: + await asyncio.gather( + *[ + stop_periodic_task(task) + for task in _app.state.efs_guardian_background_tasks + ] + ) + + return _stop + + +def setup(app: FastAPI) -> None: + app.add_event_handler("startup", _on_app_startup(app)) + app.add_event_handler("shutdown", _on_app_shutdown(app)) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py index 401f38ed1b5..be0460b7e64 100644 --- a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager.py @@ -5,8 +5,10 @@ from fastapi import FastAPI from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID +from pydantic import ByteSize from ..core.settings import ApplicationSettings, get_application_settings +from . import efs_manager_utils @dataclass(frozen=True) @@ -52,3 +54,39 @@ async def create_project_specific_data_dir( _dir_path, 0o770 ) # This gives rwx permissions to user and group, and nothing to others return _dir_path + + async def check_project_node_data_directory_exits( + self, project_id: ProjectID, node_id: NodeID + ) -> bool: + _dir_path = ( + self._efs_mounted_path + / self._project_specific_data_base_directory + / f"{project_id}" + / f"{node_id}" + ) + + return _dir_path.exists() + + async def get_project_node_data_size( + self, project_id: ProjectID, node_id: NodeID + ) -> ByteSize: + _dir_path = ( + self._efs_mounted_path + / self._project_specific_data_base_directory + / f"{project_id}" + / f"{node_id}" + ) + + return await efs_manager_utils.get_size_bash_async(_dir_path) + + async def remove_project_node_data_write_permissions( + self, project_id: ProjectID, node_id: NodeID + ) -> None: + _dir_path = ( + self._efs_mounted_path + / self._project_specific_data_base_directory + / f"{project_id}" + / f"{node_id}" + ) + + await efs_manager_utils.remove_write_permissions_bash_async(_dir_path) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py new file mode 100644 index 00000000000..9418fa733db --- /dev/null +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/efs_manager_utils.py @@ -0,0 +1,46 @@ +import asyncio +import logging + +from pydantic import ByteSize + +_logger = logging.getLogger(__name__) + + +async def get_size_bash_async(path) -> ByteSize: + # Create the subprocess + command = ["du", "-sb", path] + process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + # Wait for the subprocess to complete + stdout, stderr = await process.communicate() + + if process.returncode == 0: + # Parse the output + size = ByteSize(stdout.decode().split()[0]) + return size + msg = f"Command {' '.join(command)} failed with error code {process.returncode}: {stderr.decode()}" + _logger.error(msg) + raise RuntimeError(msg) + + +async def remove_write_permissions_bash_async(path) -> None: + # Create the subprocess + command = ["chmod", "-R", "a-w", path] + process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + # Wait for the subprocess to complete + _, stderr = await process.communicate() + + if process.returncode == 0: + return + msg = f"Command {' '.join(command)} failed with error code {process.returncode}: {stderr.decode()}" + _logger.error(msg) + raise RuntimeError(msg) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/redis.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/redis.py new file mode 100644 index 00000000000..20cbcc0a4db --- /dev/null +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/modules/redis.py @@ -0,0 +1,29 @@ +import logging +from typing import cast + +from fastapi import FastAPI +from servicelib.redis import RedisClientSDK +from settings_library.redis import RedisDatabase, RedisSettings + +logger = logging.getLogger(__name__) + + +def setup(app: FastAPI) -> None: + async def on_startup() -> None: + app.state.redis_lock_client_sdk = None + settings: RedisSettings = app.state.settings.EFS_GUARDIAN_REDIS + redis_locks_dsn = settings.build_redis_dsn(RedisDatabase.LOCKS) + app.state.redis_lock_client_sdk = lock_client = RedisClientSDK(redis_locks_dsn) + await lock_client.setup() + + async def on_shutdown() -> None: + redis_lock_client_sdk: None | RedisClientSDK = app.state.redis_lock_client_sdk + if redis_lock_client_sdk: + await redis_lock_client_sdk.shutdown() + + app.add_event_handler("startup", on_startup) + app.add_event_handler("shutdown", on_shutdown) + + +def get_redis_lock_client(app: FastAPI) -> RedisClientSDK: + return cast(RedisClientSDK, app.state.redis_lock_client_sdk) diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py new file mode 100644 index 00000000000..11c7781bbae --- /dev/null +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages.py @@ -0,0 +1,66 @@ +import logging + +from fastapi import FastAPI +from models_library.rabbitmq_messages import DynamicServiceRunningMessage +from pydantic import parse_raw_as +from servicelib.logging_utils import log_context +from simcore_service_efs_guardian.services.modules.redis import get_redis_lock_client + +from ..core.settings import get_application_settings +from ..services.efs_manager import EfsManager + +_logger = logging.getLogger(__name__) + + +async def process_dynamic_service_running_message(app: FastAPI, data: bytes) -> bool: + assert app # nosec + rabbit_message: DynamicServiceRunningMessage = parse_raw_as( + DynamicServiceRunningMessage, data + ) + _logger.debug( + "Process dynamic service running msg, project ID: %s node ID: %s, current user: %s", + rabbit_message.project_id, + rabbit_message.node_id, + rabbit_message.user_id, + ) + + settings = get_application_settings(app) + efs_manager: EfsManager = app.state.efs_manager + + dir_exists = await efs_manager.check_project_node_data_directory_exits( + rabbit_message.project_id, node_id=rabbit_message.node_id + ) + if dir_exists is False: + _logger.debug( + "Directory doesn't exists in EFS, project ID: %s node ID: %s, current user: %s", + rabbit_message.project_id, + rabbit_message.node_id, + rabbit_message.user_id, + ) + return True + + size = await efs_manager.get_project_node_data_size( + rabbit_message.project_id, node_id=rabbit_message.node_id + ) + _logger.debug( + "Current directory size: %s, project ID: %s node ID: %s, current user: %s", + size, + rabbit_message.project_id, + rabbit_message.node_id, + rabbit_message.user_id, + ) + + if size > settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES: + msg = f"Removing write permissions inside of EFS starts for project ID: {rabbit_message.project_id}, node ID: {rabbit_message.node_id}, current user: {rabbit_message.user_id}, size: {size}, upper limit: {settings.EFS_DEFAULT_USER_SERVICE_SIZE_BYTES}" + with log_context(_logger, logging.WARNING, msg=msg): + redis = get_redis_lock_client(app) + async with redis.lock_context( + f"efs_remove_write_permissions-{rabbit_message.project_id=}-{rabbit_message.node_id=}", + blocking=True, + blocking_timeout_s=10, + ): + await efs_manager.remove_project_node_data_write_permissions( + project_id=rabbit_message.project_id, node_id=rabbit_message.node_id + ) + + return True diff --git a/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py new file mode 100644 index 00000000000..d879d189157 --- /dev/null +++ b/services/efs-guardian/src/simcore_service_efs_guardian/services/process_messages_setup.py @@ -0,0 +1,66 @@ +import functools +import logging +from collections.abc import Awaitable, Callable + +from fastapi import FastAPI +from models_library.rabbitmq_messages import DynamicServiceRunningMessage +from servicelib.logging_utils import log_catch, log_context +from servicelib.rabbitmq import RabbitMQClient +from settings_library.rabbit import RabbitSettings + +from ..core.settings import ApplicationSettings +from .modules.rabbitmq import get_rabbitmq_client +from .process_messages import process_dynamic_service_running_message + +_logger = logging.getLogger(__name__) + + +_SEC = 1000 # in ms +_MIN = 60 * _SEC # in ms +_HOUR = 60 * _MIN # in ms + +_EFS_MESSAGE_TTL_IN_MS = 2 * _HOUR + + +async def _subscribe_to_rabbitmq(app) -> str: + with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channel"): + rabbit_client: RabbitMQClient = get_rabbitmq_client(app) + subscribed_queue: str = await rabbit_client.subscribe( + DynamicServiceRunningMessage.get_channel_name(), + message_handler=functools.partial( + process_dynamic_service_running_message, app + ), + exclusive_queue=False, + message_ttl=_EFS_MESSAGE_TTL_IN_MS, + ) + return subscribed_queue + + +def _on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]: + async def _startup() -> None: + with log_context( + _logger, logging.INFO, msg="setup resource tracker" + ), log_catch(_logger, reraise=False): + app_settings: ApplicationSettings = app.state.settings + app.state.efs_guardian_rabbitmq_consumer = None + settings: RabbitSettings | None = app_settings.EFS_GUARDIAN_RABBITMQ + if not settings: + _logger.warning("RabbitMQ client is de-activated in the settings") + return + app.state.efs_guardian_rabbitmq_consumer = await _subscribe_to_rabbitmq(app) + + return _startup + + +def _on_app_shutdown( + _app: FastAPI, +) -> Callable[[], Awaitable[None]]: + async def _stop() -> None: + assert _app # nosec + + return _stop + + +def setup(app: FastAPI) -> None: + app.add_event_handler("startup", _on_app_startup(app)) + app.add_event_handler("shutdown", _on_app_shutdown(app)) diff --git a/services/efs-guardian/tests/unit/conftest.py b/services/efs-guardian/tests/unit/conftest.py index 62e4352e1cc..da4196ea859 100644 --- a/services/efs-guardian/tests/unit/conftest.py +++ b/services/efs-guardian/tests/unit/conftest.py @@ -2,7 +2,10 @@ # pylint:disable=unused-argument # pylint:disable=redefined-outer-name +import os import re +import shutil +import stat from collections.abc import AsyncIterator, Callable from pathlib import Path from typing import Awaitable @@ -12,10 +15,13 @@ import simcore_service_efs_guardian import yaml from asgi_lifespan import LifespanManager +from fakeredis.aioredis import FakeRedis from fastapi import FastAPI from httpx import ASGITransport +from pytest_mock import MockerFixture from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict from servicelib.rabbitmq import RabbitMQRPCClient +from settings_library.efs import AwsEfsSettings from settings_library.rabbit import RabbitSettings from simcore_service_efs_guardian.core.application import create_app from simcore_service_efs_guardian.core.settings import ApplicationSettings @@ -26,9 +32,11 @@ "pytest_simcore.docker_registry", "pytest_simcore.docker_swarm", "pytest_simcore.environment_configs", + "pytest_simcore.faker_projects_data", "pytest_simcore.pydantic_models", "pytest_simcore.pytest_global_environs", "pytest_simcore.rabbit_service", + "pytest_simcore.redis_service", "pytest_simcore.repository_paths", "pytest_simcore.aws_s3_service", "pytest_simcore.aws_server", @@ -139,3 +147,28 @@ async def rpc_client( rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]], ) -> RabbitMQRPCClient: return await rabbitmq_rpc_client("client") + + +@pytest.fixture +async def mocked_redis_server(mocker: MockerFixture) -> None: + mock_redis = FakeRedis() + mocker.patch("redis.asyncio.from_url", return_value=mock_redis) + + +@pytest.fixture +async def cleanup(app: FastAPI): + + yield + + aws_efs_settings: AwsEfsSettings = app.state.settings.EFS_GUARDIAN_AWS_EFS_SETTINGS + _dir_path = Path(aws_efs_settings.EFS_MOUNTED_PATH) + if _dir_path.exists(): + for root, dirs, files in os.walk(_dir_path): + for name in dirs + files: + file_path = Path(root, name) + # Get the current permissions of the file or directory + current_permissions = Path.stat(file_path).st_mode + # Add write permission for the owner (user) + Path.chmod(file_path, current_permissions | stat.S_IWUSR) + + shutil.rmtree(_dir_path) diff --git a/services/efs-guardian/tests/unit/test_api_health.py b/services/efs-guardian/tests/unit/test_api_health.py index 621543e2d80..8b42d559e7f 100644 --- a/services/efs-guardian/tests/unit/test_api_health.py +++ b/services/efs-guardian/tests/unit/test_api_health.py @@ -28,7 +28,11 @@ def app_environment( ) -async def test_healthcheck(rabbit_service: RabbitSettings, client: httpx.AsyncClient): +async def test_healthcheck( + rabbit_service: RabbitSettings, + mocked_redis_server: None, + client: httpx.AsyncClient, +): response = await client.get("/") response.raise_for_status() assert response.status_code == status.HTTP_200_OK diff --git a/services/efs-guardian/tests/unit/test_efs_guardian_rpc.py b/services/efs-guardian/tests/unit/test_efs_guardian_rpc.py new file mode 100644 index 00000000000..48474a69d45 --- /dev/null +++ b/services/efs-guardian/tests/unit/test_efs_guardian_rpc.py @@ -0,0 +1,73 @@ +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-arguments +# pylint: disable=unused-argument +# pylint: disable=unused-variable + +from pathlib import Path +from unittest.mock import patch + +import pytest +from faker import Faker +from fastapi import FastAPI +from models_library.projects import ProjectID +from models_library.projects_nodes import NodeID +from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict +from pytest_simcore.helpers.typing_env import EnvVarsDict +from servicelib.rabbitmq import RabbitMQRPCClient +from servicelib.rabbitmq.rpc_interfaces.efs_guardian import efs_manager +from simcore_service_efs_guardian.core.settings import AwsEfsSettings + +pytest_simcore_core_services_selection = ["rabbit"] +pytest_simcore_ops_services_selection = [] + + +@pytest.fixture +def app_environment( + monkeypatch: pytest.MonkeyPatch, + app_environment: EnvVarsDict, + rabbit_env_vars_dict: EnvVarsDict, # rabbitMQ settings from 'rabbit' service +) -> EnvVarsDict: + return setenvs_from_dict( + monkeypatch, + { + **app_environment, + **rabbit_env_vars_dict, + }, + ) + + +async def test_rpc_create_project_specific_data_dir( + mocked_redis_server: None, + rpc_client: RabbitMQRPCClient, + faker: Faker, + app: FastAPI, + project_id: ProjectID, + node_id: NodeID, + cleanup: None, +): + aws_efs_settings: AwsEfsSettings = app.state.settings.EFS_GUARDIAN_AWS_EFS_SETTINGS + + _storage_directory_name = faker.word() + + with patch( + "simcore_service_efs_guardian.services.efs_manager.os.chown" + ) as mocked_chown: + result = await efs_manager.create_project_specific_data_dir( + rpc_client, + project_id=project_id, + node_id=node_id, + storage_directory_name=_storage_directory_name, + ) + mocked_chown.assert_called_once() + + assert isinstance(result, Path) + _expected_path = ( + aws_efs_settings.EFS_MOUNTED_PATH + / aws_efs_settings.EFS_PROJECT_SPECIFIC_DATA_DIRECTORY + / f"{project_id}" + / f"{node_id}" + / _storage_directory_name + ) + assert _expected_path == result + assert _expected_path.exists diff --git a/services/efs-guardian/tests/unit/test_efs_manager.py b/services/efs-guardian/tests/unit/test_efs_manager.py index 42e22c9386d..5c5c57cf3ab 100644 --- a/services/efs-guardian/tests/unit/test_efs_manager.py +++ b/services/efs-guardian/tests/unit/test_efs_manager.py @@ -4,6 +4,7 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable +import stat from pathlib import Path from unittest.mock import patch @@ -12,9 +13,12 @@ from fastapi import FastAPI from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict -from servicelib.rabbitmq import RabbitMQRPCClient -from servicelib.rabbitmq.rpc_interfaces.efs_guardian import efs_manager from simcore_service_efs_guardian.core.settings import AwsEfsSettings +from simcore_service_efs_guardian.services.efs_manager import ( + EfsManager, + NodeID, + ProjectID, +) pytest_simcore_core_services_selection = ["rabbit"] pytest_simcore_ops_services_selection = [] @@ -24,7 +28,7 @@ def app_environment( monkeypatch: pytest.MonkeyPatch, app_environment: EnvVarsDict, - rabbit_env_vars_dict: EnvVarsDict, # rabbitMQ settings from 'rabbit' service + rabbit_env_vars_dict: EnvVarsDict, ) -> EnvVarsDict: return setenvs_from_dict( monkeypatch, @@ -35,35 +39,106 @@ def app_environment( ) -async def test_rpc_create_project_specific_data_dir( - rpc_client: RabbitMQRPCClient, +def assert_permissions( + file_path: Path, + expected_readable: bool, + expected_writable: bool, + expected_executable: bool, +): + file_stat = Path.stat(file_path) + file_permissions = file_stat.st_mode + is_readable = bool(file_permissions & stat.S_IRUSR) + is_writable = bool(file_permissions & stat.S_IWUSR) + is_executable = bool(file_permissions & stat.S_IXUSR) + + assert ( + is_readable == expected_readable + ), f"Expected readable={expected_readable}, but got readable={is_readable} for {file_path}" + assert ( + is_writable == expected_writable + ), f"Expected writable={expected_writable}, but got writable={is_writable} for {file_path}" + assert ( + is_executable == expected_executable + ), f"Expected executable={expected_executable}, but got executable={is_executable} for {file_path}" + + +async def test_remove_write_access_rights( faker: Faker, + mocked_redis_server: None, app: FastAPI, + cleanup: None, + project_id: ProjectID, + node_id: NodeID, ): aws_efs_settings: AwsEfsSettings = app.state.settings.EFS_GUARDIAN_AWS_EFS_SETTINGS - _project_id = faker.uuid4() - _node_id = faker.uuid4() - _storage_directory_name = faker.name() + _storage_directory_name = faker.word() + _dir_path = ( + aws_efs_settings.EFS_MOUNTED_PATH + / aws_efs_settings.EFS_PROJECT_SPECIFIC_DATA_DIRECTORY + / f"{project_id}" + / f"{node_id}" + / f"{_storage_directory_name}" + ) + + efs_manager: EfsManager = app.state.efs_manager + + assert ( + await efs_manager.check_project_node_data_directory_exits( + project_id=project_id, node_id=node_id + ) + is False + ) with patch( "simcore_service_efs_guardian.services.efs_manager.os.chown" ) as mocked_chown: - result = await efs_manager.create_project_specific_data_dir( - rpc_client, - project_id=_project_id, - node_id=_node_id, + await efs_manager.create_project_specific_data_dir( + project_id=project_id, + node_id=node_id, storage_directory_name=_storage_directory_name, ) - mocked_chown.assert_called_once() + assert mocked_chown.called - assert isinstance(result, Path) - _expected_path = ( - aws_efs_settings.EFS_MOUNTED_PATH - / aws_efs_settings.EFS_PROJECT_SPECIFIC_DATA_DIRECTORY - / _project_id - / _node_id - / _storage_directory_name + assert ( + await efs_manager.check_project_node_data_directory_exits( + project_id=project_id, node_id=node_id + ) + is True + ) + + size_before = await efs_manager.get_project_node_data_size( + project_id=project_id, node_id=node_id + ) + + file_paths = [] + for i in range(3): # Let's create 3 small files for testing + file_path = Path(_dir_path, f"test_file_{i}.txt") + file_path.write_text(f"This is file {i}") + file_paths.append(file_path) + + size_after = await efs_manager.get_project_node_data_size( + project_id=project_id, node_id=node_id ) - assert _expected_path == result - assert _expected_path.exists + assert size_after > size_before + + # Now we will check removal of write permissions + for file_path in file_paths: + assert_permissions( + file_path, + expected_readable=True, + expected_writable=True, + expected_executable=False, + ) + + await efs_manager.remove_project_node_data_write_permissions( + project_id=project_id, node_id=node_id + ) + + for file_path in file_paths: + assert_permissions( + file_path, + expected_readable=True, + expected_writable=False, + expected_executable=False, + )