Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎨 Add prometheus metrics for logstreaming #5594

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
cf8f3b6
start adding prometheus metrics for logstreaming
bisgaard-itis Apr 2, 2024
938005f
first complete implementation of instrumentation for logstreaming
bisgaard-itis Apr 2, 2024
8d52649
test logstreaming metrics collection
bisgaard-itis Apr 2, 2024
566e684
Merge branch 'master' into 5591-add-prometheus-metrics-for-logstreaming
bisgaard-itis Apr 2, 2024
704b5aa
log_queue-sizes -> get_log_queue_sizes
bisgaard-itis Apr 2, 2024
b1ebdef
Merge branch '5591-add-prometheus-metrics-for-logstreaming' of github…
bisgaard-itis Apr 2, 2024
325f380
fix unit tests
bisgaard-itis Apr 2, 2024
0dfa82f
make pylint happy
bisgaard-itis Apr 2, 2024
a7620c2
@pcrespov @GitHK use tenacity
bisgaard-itis Apr 2, 2024
7abeb1e
@pcrespov protect method
bisgaard-itis Apr 2, 2024
b936a78
ensure task is properly cancelled @pcrespov
bisgaard-itis Apr 2, 2024
5d74c5e
move nosec
bisgaard-itis Apr 2, 2024
21c3926
use backgroundtask @sanderegg @pcrespov
bisgaard-itis Apr 2, 2024
dd9b18b
correct background task
bisgaard-itis Apr 2, 2024
a3fd49f
merge master into 5591-add-prometheus-metrics-for-logstreaming
bisgaard-itis Apr 2, 2024
ed71024
dont preomptimize @pcrespov
bisgaard-itis Apr 3, 2024
8275304
mere master into 5591-add-prometheus-metrics-for-logstreaming
bisgaard-itis Apr 3, 2024
e975d37
mock setup_prometheus when rabbit is not available
bisgaard-itis Apr 3, 2024
889d8b8
correct import of field
bisgaard-itis Apr 3, 2024
a584d65
fix tests
bisgaard-itis Apr 3, 2024
a30c3a3
redo change to dict
bisgaard-itis Apr 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
from typing import Annotated, cast
import asyncio
from time import time
from typing import Annotated, Final, cast

from fastapi import Depends, FastAPI
from pydantic import NonNegativeInt
from servicelib.aiohttp.application_setup import ApplicationSetupError
from servicelib.fastapi.dependencies import get_app
from servicelib.rabbitmq import RabbitMQClient

from ...services.log_streaming import LogDistributor

_MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS: Final[int] = 10


def get_rabbitmq_client(app: Annotated[FastAPI, Depends(get_app)]) -> RabbitMQClient:
assert app.state.rabbitmq_client # nosec
Expand All @@ -18,6 +23,17 @@ def get_log_distributor(app: Annotated[FastAPI, Depends(get_app)]) -> LogDistrib
return cast(LogDistributor, app.state.log_distributor)


async def wait_till_log_distributor_ready(app) -> None:
start = time()
while not hasattr(app.state, "log_distributor"):
if time() - start > _MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS:
raise ApplicationSetupError(
f"Api server's log_distributor was not ready within {_MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS=} seconds"
)
await asyncio.sleep(1)
return
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved


def get_log_check_timeout(app: Annotated[FastAPI, Depends(get_app)]) -> NonNegativeInt:
assert app.state.settings # nosec
return cast(NonNegativeInt, app.state.settings.API_SERVER_LOG_CHECK_TIMEOUT_SECONDS)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class MissingWallet(CustomBaseError):
pass


class ApplicationSetupError(CustomBaseError):
pass


async def custom_error_handler(_: Request, exc: CustomBaseError):
if isinstance(exc, InsufficientCredits):
return JSONResponse(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import asyncio
from dataclasses import dataclass
from typing import Final, cast

from attr import field
from fastapi import FastAPI
from prometheus_client import CollectorRegistry, Gauge
from pydantic import PositiveInt
from servicelib.fastapi.prometheus_instrumentation import (
setup_prometheus_instrumentation as setup_rest_instrumentation,
)
from simcore_service_api_server.api.dependencies.rabbitmq import (
get_log_distributor,
wait_till_log_distributor_ready,
)
from simcore_service_api_server.models.schemas.jobs import JobID

from .._meta import PROJECT_NAME

METRICS_NAMESPACE: Final[str] = PROJECT_NAME.replace("-", "_")
mrnicegyu11 marked this conversation as resolved.
Show resolved Hide resolved


@dataclass(slots=True, kw_only=True)
class ApiServerPrometheusInstrumentation:
registry: CollectorRegistry
_logstreaming_queues: Gauge = field(init=False)

def __post_init__(self) -> None:
self._logstreaming_queues = Gauge(
"log_stream_queue_length",
"#Logs in log streaming queue",
["job_id"],
namespace=METRICS_NAMESPACE,
)

def update_metrics(self, log_queue_sizes: dict[JobID, int]):
self._logstreaming_queues.clear()
for job_id, length in log_queue_sizes.items():
self._logstreaming_queues.labels(job_id=job_id).set(length)


async def collect_prometheus_metrics_task(app: FastAPI):
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
metrics_collect_seconds: PositiveInt = (
app.state.settings.API_SERVER_PROMETHEUS_INSTRUMENTATION_COLLECT_SECONDS
)
assert (
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
app.state.instrumentation
), "Instrumentation not setup. Please check the configuration" # nosec
instrumentation = get_instrumentation(app)
await wait_till_log_distributor_ready(app)
log_distributor = get_log_distributor(app)
while True:
await asyncio.sleep(metrics_collect_seconds)
instrumentation.update_metrics(
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
log_queue_sizes=log_distributor.get_log_queue_sizes()
)


def setup_prometheus_instrumentation(app: FastAPI):
instrumentator = setup_rest_instrumentation(app)

async def on_startup() -> None:
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
app.state.instrumentation = ApiServerPrometheusInstrumentation(
registry=instrumentator.registry
)
app.state.instrumentation_task = asyncio.create_task(
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
collect_prometheus_metrics_task(app)
)

async def on_shutdown() -> None:
assert app.state.instrumentation_task # nosec
app.state.instrumentation_task.cancel()
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)


def get_instrumentation(app: FastAPI) -> ApiServerPrometheusInstrumentation:
assert (
app.state.instrumentation
), "Instrumentation not setup. Please check the configuration" # nosec
return cast(ApiServerPrometheusInstrumentation, app.state.instrumentation)
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
from fastapi_pagination import add_pagination
from httpx import HTTPError as HttpxException
from models_library.basic_types import BootModeEnum
from servicelib.fastapi.prometheus_instrumentation import (
setup_prometheus_instrumentation,
)
from servicelib.logging_utils import config_all_loggers
from simcore_service_api_server.api.errors.log_handling_error import (
log_handling_error_handler,
Expand All @@ -30,6 +27,7 @@
from ..api.routes.health import router as health_router
from ..services import catalog, director_v2, storage, webserver
from ..services.rabbitmq import setup_rabbitmq
from ._prometheus_instrumentation import setup_prometheus_instrumentation
from .events import create_start_app_handler, create_stop_app_handler
from .openapi import override_openapi_method, use_route_names_as_operation_ids
from .settings import ApplicationSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Any

from models_library.basic_types import BootModeEnum, LogLevel
from pydantic import Field, NonNegativeInt, SecretStr, parse_obj_as
from pydantic import Field, NonNegativeInt, PositiveInt, SecretStr, parse_obj_as
from pydantic.class_validators import validator
from settings_library.base import BaseCustomSettings
from settings_library.basic_types import PortInt, VersionTag
Expand Down Expand Up @@ -137,6 +137,7 @@ class ApplicationSettings(BasicSettings):
)
API_SERVER_LOG_CHECK_TIMEOUT_SECONDS: NonNegativeInt = 3 * 60
API_SERVER_PROMETHEUS_INSTRUMENTATION_ENABLED: bool = True
API_SERVER_PROMETHEUS_INSTRUMENTATION_COLLECT_SECONDS: PositiveInt = 5
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
# DEV-TOOLS
API_SERVER_DEV_HTTP_CALLS_LOGS_PATH: Path | None = Field(
default=None,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
from asyncio import Queue
from typing import AsyncIterable, Awaitable, Callable, Final
from typing import AsyncIterable, Final

from models_library.rabbitmq_messages import LoggerRabbitMessage
from models_library.users import UserID
Expand Down Expand Up @@ -31,7 +31,7 @@ class LogStreamerRegistionConflict(LogDistributionBaseException):
class LogDistributor:
def __init__(self, rabbitmq_client: RabbitMQClient):
self._rabbit_client = rabbitmq_client
self._log_streamers: dict[JobID, Callable[[JobLog], Awaitable[None]]] = {}
self._log_streamers: dict[JobID, Queue] = {}
self._queue_name: str

async def setup(self):
Expand Down Expand Up @@ -72,22 +72,20 @@ async def _distribute_logs(self, data: bytes):
log_level=got.log_level,
messages=got.messages,
)
callback = self._log_streamers.get(item.job_id)
if callback is None:
queue = self._log_streamers.get(item.job_id)
if queue is None:
raise LogStreamerNotRegistered(
f"Could not forward log because a logstreamer associated with job_id={item.job_id} was not registered"
)
await callback(item)
await queue.put(item)
return True

async def register(
self, job_id: JobID, callback: Callable[[JobLog], Awaitable[None]]
):
async def register(self, job_id: JobID, queue: Queue):
if job_id in self._log_streamers:
raise LogStreamerRegistionConflict(
f"A stream was already connected to {job_id=}. Only a single stream can be connected at the time"
)
self._log_streamers[job_id] = callback
self._log_streamers[job_id] = queue
await self._rabbit_client.add_topics(
LoggerRabbitMessage.get_channel_name(), topics=[f"{job_id}.*"]
)
Expand All @@ -100,6 +98,11 @@ async def deregister(self, job_id: JobID):
)
del self._log_streamers[job_id]

def get_log_queue_sizes(self) -> dict[JobID, int]:
keys: list[JobID] = list(self._log_streamers.keys())
values: list[int] = list(map(lambda q: q.qsize(), self._log_streamers.values()))
return dict(zip(keys, values))
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved


class LogStreamer:
def __init__(
Expand All @@ -120,7 +123,7 @@ def __init__(
self._log_check_timeout: NonNegativeInt = log_check_timeout

async def setup(self):
await self._log_distributor.register(self._job_id, self._queue.put)
await self._log_distributor.register(self._job_id, self._queue)
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
self._is_registered = True

async def teardown(self):
Expand Down
Loading