Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎨 EFS Guardian: adding size monitoring #6502

Merged
merged 18 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/models-library/src/models_library/rabbitmq_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,24 @@ def routing_key(self) -> str | None:
return None


class DynamicServiceRunningMessage(RabbitMessageBase):
channel_name: Literal["io.simcore.service.dynamic-service-running"] = Field(
default="io.simcore.service.dynamic-service-running", const=True
)

project_id: ProjectID
node_id: NodeID
user_id: UserID
product_name: ProductName | None
created_at: datetime.datetime = Field(
default_factory=lambda: arrow.utcnow().datetime,
description="message creation datetime",
)

def routing_key(self) -> str | None:
return None


class RabbitResourceTrackingStartedMessage(RabbitResourceTrackingBaseMessage):
message_type: RabbitResourceTrackingMessageType = Field(
default=RabbitResourceTrackingMessageType.TRACKING_STARTED, const=True
Expand Down
5 changes: 5 additions & 0 deletions services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,11 @@ services:
RABBIT_PORT: ${RABBIT_PORT}
RABBIT_SECURE: ${RABBIT_SECURE}
RABBIT_USER: ${RABBIT_USER}
REDIS_HOST: ${REDIS_HOST}
REDIS_PASSWORD: ${REDIS_PASSWORD}
REDIS_PORT: ${REDIS_PORT}
REDIS_SECURE: ${REDIS_SECURE}
REDIS_USER: ${REDIS_USER}
SC_USER_ID: ${SC_USER_ID}
SC_USER_NAME: ${SC_USER_NAME}
EFS_USER_ID: ${EFS_USER_ID}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from fastapi import FastAPI
from models_library.progress_bar import ProgressReport
from models_library.rabbitmq_messages import (
DynamicServiceRunningMessage,
EventRabbitMessage,
LoggerRabbitMessage,
ProgressRabbitMessageNode,
Expand Down Expand Up @@ -34,6 +35,12 @@ async def post_resource_tracking_message(
await _post_rabbit_message(app, message)


async def post_dynamic_service_running_message(
app: FastAPI, message: DynamicServiceRunningMessage
):
await _post_rabbit_message(app, message)


async def post_log_message(
app: FastAPI, log: LogMessageStr, *, log_level: LogLevelInt
) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import logging
from typing import Final

from fastapi import FastAPI
from models_library.generated_models.docker_rest_api import ContainerState
from models_library.rabbitmq_messages import (
DynamicServiceRunningMessage,
RabbitResourceTrackingHeartbeatMessage,
RabbitResourceTrackingStartedMessage,
RabbitResourceTrackingStoppedMessage,
Expand All @@ -19,7 +21,10 @@
are_all_containers_in_expected_states,
get_container_states,
)
from ...core.rabbitmq import post_resource_tracking_message
from ...core.rabbitmq import (
post_dynamic_service_running_message,
post_resource_tracking_message,
)
from ...core.settings import ApplicationSettings, ResourceTrackingSettings
from ...models.shared_store import SharedStore
from ._models import ResourceTrackingState
Expand Down Expand Up @@ -70,10 +75,21 @@ async def _heart_beat_task(app: FastAPI):
)

if are_all_containers_in_expected_states(container_states.values()):
message = RabbitResourceTrackingHeartbeatMessage(
rut_message = RabbitResourceTrackingHeartbeatMessage(
service_run_id=settings.DY_SIDECAR_RUN_ID
)
await post_resource_tracking_message(app, message)
dyn_message = DynamicServiceRunningMessage(
project_id=settings.DY_SIDECAR_PROJECT_ID,
node_id=settings.DY_SIDECAR_NODE_ID,
user_id=settings.DY_SIDECAR_USER_ID,
product_name=settings.DY_SIDECAR_PRODUCT_NAME,
)
await asyncio.gather(
*[
post_resource_tracking_message(app, rut_message),
post_dynamic_service_running_message(app, dyn_message),
]
)
else:
_logger.info(
"heart beat message skipped: container_states=%s", container_states
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
)
from ..api.rest.routes import setup_api_routes
from ..api.rpc.routes import setup_rpc_routes
from ..services.background_tasks_setup import setup as setup_background_tasks
from ..services.efs_manager_setup import setup as setup_efs_manager
from ..services.modules.rabbitmq import setup as setup_rabbitmq
from ..services.modules.redis import setup as setup_redis
from ..services.process_messages_setup import setup as setup_process_messages
from .settings import ApplicationSettings

logger = logging.getLogger(__name__)
Expand All @@ -40,11 +43,14 @@ def create_app(settings: ApplicationSettings) -> FastAPI:

# PLUGINS SETUP
setup_rabbitmq(app)
setup_redis(app)

setup_api_routes(app)
setup_rpc_routes(app)

setup_efs_manager(app)
setup_background_tasks(app)
setup_process_messages(app)

# EVENTS
async def _on_startup() -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
LogLevel,
VersionTag,
)
from pydantic import Field, PositiveInt, validator
from pydantic import ByteSize, Field, PositiveInt, validator
from settings_library.base import BaseCustomSettings
from settings_library.efs import AwsEfsSettings
from settings_library.rabbit import RabbitSettings
from settings_library.redis import RedisSettings
from settings_library.tracing import TracingSettings
from settings_library.utils_logging import MixinLoggingSettings

Expand Down Expand Up @@ -57,6 +58,9 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
EFS_GROUP_NAME: str = Field(
description="Linux group name that the EFS and Simcore linux users are part of"
)
EFS_DEFAULT_USER_SERVICE_SIZE_BYTES: ByteSize = Field(
default=536870912000 # 500GiB = 534GB
matusdrobuliak66 marked this conversation as resolved.
Show resolved Hide resolved
)

# RUNTIME -----------------------------------------------------------
EFS_GUARDIAN_DEBUG: bool = Field(
Expand All @@ -76,6 +80,7 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):

EFS_GUARDIAN_AWS_EFS_SETTINGS: AwsEfsSettings = Field(auto_default_from_env=True)
EFS_GUARDIAN_RABBITMQ: RabbitSettings = Field(auto_default_from_env=True)
EFS_GUARDIAN_REDIS: RedisSettings = Field(auto_default_from_env=True)
EFS_GUARDIAN_TRACING: TracingSettings | None = Field(
auto_default_from_env=True, description="settings for opentelemetry tracing"
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import logging

from fastapi import FastAPI

from ..core.settings import ApplicationSettings

_logger = logging.getLogger(__name__)


async def removal_policy_task(app: FastAPI) -> None:
_logger.info("Removal policy task started")
matusdrobuliak66 marked this conversation as resolved.
Show resolved Hide resolved

# After X days of inactivity remove data from EFS
# Probably use `last_modified_data` in the project DB table
# Maybe lock project during this time lock_project()
matusdrobuliak66 marked this conversation as resolved.
Show resolved Hide resolved

app_settings: ApplicationSettings = app.state.settings
assert app_settings # nosec
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import asyncio
import logging
from collections.abc import Awaitable, Callable
from datetime import timedelta
from typing import TypedDict

from fastapi import FastAPI
from servicelib.background_task import stop_periodic_task
from servicelib.logging_utils import log_catch, log_context
from servicelib.redis_utils import start_exclusive_periodic_task

from .background_tasks import removal_policy_task
from .modules.redis import get_redis_lock_client

_logger = logging.getLogger(__name__)


class EfsGuardianBackgroundTask(TypedDict):
name: str
task_func: Callable


_EFS_GUARDIAN_BACKGROUND_TASKS = [
EfsGuardianBackgroundTask(
name="efs_removal_policy_task", task_func=removal_policy_task
)
]


def on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]:
matusdrobuliak66 marked this conversation as resolved.
Show resolved Hide resolved
async def _startup() -> None:
with log_context(
_logger, logging.INFO, msg="Efs Guardian startup.."
), log_catch(_logger, reraise=False):
app.state.efs_guardian_background_tasks = []

# Setup periodic tasks
for task in _EFS_GUARDIAN_BACKGROUND_TASKS:
exclusive_task = start_exclusive_periodic_task(
get_redis_lock_client(app),
task["task_func"],
task_period=timedelta(seconds=60), # 1 minute
retry_after=timedelta(seconds=300), # 5 minutes
task_name=task["name"],
app=app,
)
app.state.efs_guardian_background_tasks.append(exclusive_task)

return _startup


def on_app_shutdown(
matusdrobuliak66 marked this conversation as resolved.
Show resolved Hide resolved
_app: FastAPI,
) -> Callable[[], Awaitable[None]]:
async def _stop() -> None:
with log_context(
_logger, logging.INFO, msg="Efs Guardian shutdown.."
), log_catch(_logger, reraise=False):
assert _app # nosec
if _app.state.efs_guardian_background_tasks:
await asyncio.gather(
*[
stop_periodic_task(task)
for task in _app.state.efs_guardian_background_tasks
]
)

return _stop


def setup(app: FastAPI) -> None:
app.add_event_handler("startup", on_app_startup(app))
app.add_event_handler("shutdown", on_app_shutdown(app))
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from fastapi import FastAPI
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from pydantic import ByteSize

from ..core.settings import ApplicationSettings, get_application_settings
from . import efs_manager_utils


@dataclass(frozen=True)
Expand Down Expand Up @@ -52,3 +54,40 @@ async def create_project_specific_data_dir(
_dir_path, 0o770
) # This gives rwx permissions to user and group, and nothing to others
return _dir_path

async def check_project_node_data_directory_exits(
self, project_id: ProjectID, node_id: NodeID
) -> bool:
_dir_path = (
self._efs_mounted_path
/ self._project_specific_data_base_directory
/ f"{project_id}"
/ f"{node_id}"
)

return _dir_path.exists()

async def get_project_node_data_size(
self, project_id: ProjectID, node_id: NodeID
) -> ByteSize:
_dir_path = (
self._efs_mounted_path
/ self._project_specific_data_base_directory
/ f"{project_id}"
/ f"{node_id}"
)

service_size = await efs_manager_utils.get_size_bash_async(_dir_path)
matusdrobuliak66 marked this conversation as resolved.
Show resolved Hide resolved
return service_size

async def remove_project_node_data_write_permissions(
self, project_id: ProjectID, node_id: NodeID
) -> None:
_dir_path = (
self._efs_mounted_path
/ self._project_specific_data_base_directory
/ f"{project_id}"
/ f"{node_id}"
)

await efs_manager_utils.remove_write_permissions_bash_async(_dir_path)
matusdrobuliak66 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import asyncio
import logging

from pydantic import ByteSize

_logger = logging.getLogger(__name__)


async def get_size_bash_async(path) -> ByteSize:
matusdrobuliak66 marked this conversation as resolved.
Show resolved Hide resolved
# Create the subprocess
command = ["du", "-sb", path]
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

# Wait for the subprocess to complete
stdout, stderr = await process.communicate()

if process.returncode == 0:
# Parse the output
size = ByteSize(stdout.decode().split()[0])
return size
msg = f"Command {' '.join(command)} failed with error code {process.returncode}: {stderr.decode()}"
_logger.error(msg)
raise RuntimeError(msg)


async def remove_write_permissions_bash_async(path) -> None:
# Create the subprocess
command = ["chmod", "-R", "a-w", path]
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

# Wait for the subprocess to complete
_, stderr = await process.communicate()

if process.returncode == 0:
return
msg = f"Command {' '.join(command)} failed with error code {process.returncode}: {stderr.decode()}"
_logger.error(msg)
raise RuntimeError(msg)
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import logging
from typing import cast

from fastapi import FastAPI
from servicelib.redis import RedisClientSDK
from settings_library.redis import RedisDatabase, RedisSettings

logger = logging.getLogger(__name__)


def setup(app: FastAPI) -> None:
async def on_startup() -> None:
app.state.redis_lock_client_sdk = None
settings: RedisSettings = app.state.settings.EFS_GUARDIAN_REDIS
redis_locks_dsn = settings.build_redis_dsn(RedisDatabase.LOCKS)
app.state.redis_lock_client_sdk = lock_client = RedisClientSDK(redis_locks_dsn)
matusdrobuliak66 marked this conversation as resolved.
Show resolved Hide resolved
await lock_client.setup()

async def on_shutdown() -> None:
redis_lock_client_sdk: None | RedisClientSDK = app.state.redis_lock_client_sdk
if redis_lock_client_sdk:
await redis_lock_client_sdk.shutdown()

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)


def get_redis_lock_client(app: FastAPI) -> RedisClientSDK:
return cast(RedisClientSDK, app.state.redis_lock_client_sdk)
Loading
Loading