Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎨 Add prometheus metrics for logstreaming #5594

Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
cf8f3b6
start adding prometheus metrics for logstreaming
bisgaard-itis Apr 2, 2024
938005f
first complete implementation of instrumentation for logstreaming
bisgaard-itis Apr 2, 2024
8d52649
test logstreaming metrics collection
bisgaard-itis Apr 2, 2024
566e684
Merge branch 'master' into 5591-add-prometheus-metrics-for-logstreaming
bisgaard-itis Apr 2, 2024
704b5aa
log_queue-sizes -> get_log_queue_sizes
bisgaard-itis Apr 2, 2024
b1ebdef
Merge branch '5591-add-prometheus-metrics-for-logstreaming' of github…
bisgaard-itis Apr 2, 2024
325f380
fix unit tests
bisgaard-itis Apr 2, 2024
0dfa82f
make pylint happy
bisgaard-itis Apr 2, 2024
a7620c2
@pcrespov @GitHK use tenacity
bisgaard-itis Apr 2, 2024
7abeb1e
@pcrespov protect method
bisgaard-itis Apr 2, 2024
b936a78
ensure task is properly cancelled @pcrespov
bisgaard-itis Apr 2, 2024
5d74c5e
move nosec
bisgaard-itis Apr 2, 2024
21c3926
use backgroundtask @sanderegg @pcrespov
bisgaard-itis Apr 2, 2024
dd9b18b
correct background task
bisgaard-itis Apr 2, 2024
a3fd49f
merge master into 5591-add-prometheus-metrics-for-logstreaming
bisgaard-itis Apr 2, 2024
ed71024
dont preomptimize @pcrespov
bisgaard-itis Apr 3, 2024
8275304
mere master into 5591-add-prometheus-metrics-for-logstreaming
bisgaard-itis Apr 3, 2024
e975d37
mock setup_prometheus when rabbit is not available
bisgaard-itis Apr 3, 2024
889d8b8
correct import of field
bisgaard-itis Apr 3, 2024
a584d65
fix tests
bisgaard-itis Apr 3, 2024
a30c3a3
redo change to dict
bisgaard-itis Apr 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
from typing import Annotated, cast
import logging
from typing import Annotated, Final, cast

from fastapi import Depends, FastAPI
from pydantic import NonNegativeInt
from servicelib.aiohttp.application_setup import ApplicationSetupError
from servicelib.fastapi.dependencies import get_app
from servicelib.rabbitmq import RabbitMQClient
from tenacity import before_sleep_log, retry, stop_after_delay, wait_fixed

from ...services.log_streaming import LogDistributor

_MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS: Final[int] = 10

_logger = logging.getLogger(__name__)


def get_rabbitmq_client(app: Annotated[FastAPI, Depends(get_app)]) -> RabbitMQClient:
assert app.state.rabbitmq_client # nosec
Expand All @@ -18,6 +25,20 @@ def get_log_distributor(app: Annotated[FastAPI, Depends(get_app)]) -> LogDistrib
return cast(LogDistributor, app.state.log_distributor)


@retry(
wait=wait_fixed(2),
stop=stop_after_delay(_MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS),
before_sleep=before_sleep_log(_logger, logging.WARNING),
reraise=True,
)
async def wait_till_log_distributor_ready(app) -> None:
if not hasattr(app.state, "log_distributor"):
raise ApplicationSetupError(
f"Api server's log_distributor was not ready within {_MAX_WAIT_FOR_LOG_DISTRIBUTOR_SECONDS=} seconds"
)
return


def get_log_check_timeout(app: Annotated[FastAPI, Depends(get_app)]) -> NonNegativeInt:
assert app.state.settings # nosec
return cast(NonNegativeInt, app.state.settings.API_SERVER_LOG_CHECK_TIMEOUT_SECONDS)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class MissingWallet(CustomBaseError):
pass


class ApplicationSetupError(CustomBaseError):
pass


async def custom_error_handler(_: Request, exc: CustomBaseError):
if isinstance(exc, InsufficientCredits):
return JSONResponse(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import asyncio
from dataclasses import dataclass
from typing import Final, cast

from attr import field
from fastapi import FastAPI
from prometheus_client import CollectorRegistry, Gauge
from pydantic import PositiveInt
from servicelib.fastapi.prometheus_instrumentation import (
setup_prometheus_instrumentation as setup_rest_instrumentation,
)
from simcore_service_api_server.api.dependencies.rabbitmq import (
get_log_distributor,
wait_till_log_distributor_ready,
)
from simcore_service_api_server.models.schemas.jobs import JobID

from .._meta import PROJECT_NAME

METRICS_NAMESPACE: Final[str] = PROJECT_NAME.replace("-", "_")
mrnicegyu11 marked this conversation as resolved.
Show resolved Hide resolved


@dataclass(slots=True, kw_only=True)
class ApiServerPrometheusInstrumentation:
registry: CollectorRegistry
_logstreaming_queues: Gauge = field(init=False)

def __post_init__(self) -> None:
self._logstreaming_queues = Gauge(
"log_stream_queue_length",
"#Logs in log streaming queue",
["job_id"],
namespace=METRICS_NAMESPACE,
)

def update_metrics(self, log_queue_sizes: dict[JobID, int]):
self._logstreaming_queues.clear()
for job_id, length in log_queue_sizes.items():
self._logstreaming_queues.labels(job_id=job_id).set(length)


async def _collect_prometheus_metrics_task(app: FastAPI):
metrics_collect_seconds: PositiveInt = (
app.state.settings.API_SERVER_PROMETHEUS_INSTRUMENTATION_COLLECT_SECONDS
)
assert (
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
app.state.instrumentation
), "Instrumentation not setup. Please check the configuration" # nosec
instrumentation = get_instrumentation(app)
await wait_till_log_distributor_ready(app)
log_distributor = get_log_distributor(app)
while True:
await asyncio.sleep(metrics_collect_seconds)
instrumentation.update_metrics(
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
log_queue_sizes=log_distributor.get_log_queue_sizes()
)


def setup_prometheus_instrumentation(app: FastAPI):
instrumentator = setup_rest_instrumentation(app)

async def on_startup() -> None:
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
app.state.instrumentation = ApiServerPrometheusInstrumentation(
registry=instrumentator.registry
)
app.state.instrumentation_task = asyncio.create_task(
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
_collect_prometheus_metrics_task(app)
)

async def on_shutdown() -> None:
assert app.state.instrumentation_task # nosec
app.state.instrumentation_task.cancel()
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
try:
await app.state.instrumentation_task
except asyncio.CancelledError:
pass

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)


def get_instrumentation(app: FastAPI) -> ApiServerPrometheusInstrumentation:
assert (
app.state.instrumentation
), "Instrumentation not setup. Please check the configuration" # nosec
return cast(ApiServerPrometheusInstrumentation, app.state.instrumentation)
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
from fastapi_pagination import add_pagination
from httpx import HTTPError as HttpxException
from models_library.basic_types import BootModeEnum
from servicelib.fastapi.prometheus_instrumentation import (
setup_prometheus_instrumentation,
)
from servicelib.logging_utils import config_all_loggers
from simcore_service_api_server.api.errors.log_handling_error import (
log_handling_error_handler,
Expand All @@ -30,6 +27,7 @@
from ..api.routes.health import router as health_router
from ..services import catalog, director_v2, storage, webserver
from ..services.rabbitmq import setup_rabbitmq
from ._prometheus_instrumentation import setup_prometheus_instrumentation
from .events import create_start_app_handler, create_stop_app_handler
from .openapi import override_openapi_method, use_route_names_as_operation_ids
from .settings import ApplicationSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Any

from models_library.basic_types import BootModeEnum, LogLevel
from pydantic import Field, NonNegativeInt, SecretStr, parse_obj_as
from pydantic import Field, NonNegativeInt, PositiveInt, SecretStr, parse_obj_as
from pydantic.class_validators import validator
from settings_library.base import BaseCustomSettings
from settings_library.basic_types import PortInt, VersionTag
Expand Down Expand Up @@ -137,6 +137,7 @@ class ApplicationSettings(BasicSettings):
)
API_SERVER_LOG_CHECK_TIMEOUT_SECONDS: NonNegativeInt = 3 * 60
API_SERVER_PROMETHEUS_INSTRUMENTATION_ENABLED: bool = True
API_SERVER_PROMETHEUS_INSTRUMENTATION_COLLECT_SECONDS: PositiveInt = 5
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
# DEV-TOOLS
API_SERVER_DEV_HTTP_CALLS_LOGS_PATH: Path | None = Field(
default=None,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
from asyncio import Queue
from typing import AsyncIterable, Awaitable, Callable, Final
from typing import AsyncIterable, Final

from models_library.rabbitmq_messages import LoggerRabbitMessage
from models_library.users import UserID
Expand Down Expand Up @@ -31,7 +31,7 @@ class LogStreamerRegistionConflict(LogDistributionBaseException):
class LogDistributor:
def __init__(self, rabbitmq_client: RabbitMQClient):
self._rabbit_client = rabbitmq_client
self._log_streamers: dict[JobID, Callable[[JobLog], Awaitable[None]]] = {}
self._log_streamers: dict[JobID, Queue] = {}
self._queue_name: str

async def setup(self):
Expand Down Expand Up @@ -72,22 +72,20 @@ async def _distribute_logs(self, data: bytes):
log_level=got.log_level,
messages=got.messages,
)
callback = self._log_streamers.get(item.job_id)
if callback is None:
queue = self._log_streamers.get(item.job_id)
if queue is None:
raise LogStreamerNotRegistered(
f"Could not forward log because a logstreamer associated with job_id={item.job_id} was not registered"
)
await callback(item)
await queue.put(item)
return True

async def register(
self, job_id: JobID, callback: Callable[[JobLog], Awaitable[None]]
):
async def register(self, job_id: JobID, queue: Queue):
if job_id in self._log_streamers:
raise LogStreamerRegistionConflict(
f"A stream was already connected to {job_id=}. Only a single stream can be connected at the time"
)
self._log_streamers[job_id] = callback
self._log_streamers[job_id] = queue
await self._rabbit_client.add_topics(
LoggerRabbitMessage.get_channel_name(), topics=[f"{job_id}.*"]
)
Expand All @@ -100,6 +98,11 @@ async def deregister(self, job_id: JobID):
)
del self._log_streamers[job_id]

def get_log_queue_sizes(self) -> dict[JobID, int]:
keys: list[JobID] = list(self._log_streamers.keys())
values: list[int] = list(map(lambda q: q.qsize(), self._log_streamers.values()))
return dict(zip(keys, values))
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved


class LogStreamer:
def __init__(
Expand All @@ -120,7 +123,7 @@ def __init__(
self._log_check_timeout: NonNegativeInt = log_check_timeout

async def setup(self):
await self._log_distributor.register(self._job_id, self._queue.put)
await self._log_distributor.register(self._job_id, self._queue)
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
self._is_registered = True

async def teardown(self):
Expand Down
57 changes: 34 additions & 23 deletions services/api-server/tests/unit/test_services_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# pylint: disable=too-many-arguments
# pylint: disable=unused-argument
# pylint: disable=unused-variable
# pylint: disable=R6301

import asyncio
import logging
Expand All @@ -16,6 +17,7 @@
import httpx
import pytest
import respx
from attr import dataclass
from faker import Faker
from fastapi import FastAPI, status
from fastapi.encoders import jsonable_encoder
Expand Down Expand Up @@ -113,12 +115,18 @@ async def test_subscribe_publish_receive_logs(
log_distributor: LogDistributor,
mocker: MockerFixture,
):
async def _consumer_message_handler(job_log: JobLog):
_consumer_message_handler.called = True
_consumer_message_handler.job_log = job_log
assert isinstance(job_log, JobLog)
@dataclass
class MockQueue:
called: bool = False
job_log: JobLog | None = None

await log_distributor.register(project_id, _consumer_message_handler)
async def put(self, job_log: JobLog):
self.called = True
self.job_log = job_log
assert isinstance(job_log, JobLog)

mock_queue = MockQueue()
await log_distributor.register(project_id, mock_queue) # type: ignore

# log producer
rabbitmq_producer = create_rabbitmq_client("pytest_producer")
Expand All @@ -128,16 +136,14 @@ async def _consumer_message_handler(job_log: JobLog):
node_id=node_id,
messages=[faker.text() for _ in range(10)],
)
_consumer_message_handler.called = False
_consumer_message_handler.job_log = None
await rabbitmq_producer.publish(log_message.channel_name, log_message)

# check it received
await asyncio.sleep(1)
await log_distributor.deregister(project_id)

assert _consumer_message_handler.called
job_log = _consumer_message_handler.job_log
assert mock_queue.called
job_log = mock_queue.job_log
assert isinstance(job_log, JobLog)
assert job_log.job_id == log_message.project_id

Expand All @@ -147,12 +153,13 @@ async def rabbit_consuming_context(
app: FastAPI,
project_id: ProjectID,
) -> AsyncIterable[AsyncMock]:
consumer_message_handler = AsyncMock()

queue = asyncio.Queue()
queue.put = AsyncMock()
log_distributor: LogDistributor = get_log_distributor(app)
await log_distributor.register(project_id, consumer_message_handler)
await log_distributor.register(project_id, queue)

yield consumer_message_handler
yield queue.put

await log_distributor.deregister(project_id)

Expand Down Expand Up @@ -233,10 +240,12 @@ async def test_log_distributor_register_deregister(
):
collected_logs: list[str] = []

async def callback(job_log: JobLog):
for msg in job_log.messages:
collected_logs.append(msg)
class MockQueue:
async def put(self, job_log: JobLog):
for msg in job_log.messages:
collected_logs.append(msg)

queue = MockQueue()
published_logs: list[str] = []

async def _log_publisher():
Expand All @@ -246,12 +255,12 @@ async def _log_publisher():
await produce_logs("expected", project_id, node_id, [msg], logging.DEBUG)
published_logs.append(msg)

await log_distributor.register(project_id, callback)
await log_distributor.register(project_id, queue) # type: ignore
publisher_task = asyncio.create_task(_log_publisher())
await asyncio.sleep(0.1)
await log_distributor.deregister(project_id)
await asyncio.sleep(0.1)
await log_distributor.register(project_id, callback)
await log_distributor.register(project_id, queue) # type: ignore
await asyncio.gather(publisher_task)
await asyncio.sleep(0.5)
await log_distributor.deregister(project_id)
Expand All @@ -274,12 +283,14 @@ async def test_log_distributor_multiple_streams(

collected_logs: dict[JobID, list[str]] = {id_: [] for id_ in job_ids}

async def callback(job_log: JobLog):
job_id = job_log.job_id
assert (msgs := collected_logs.get(job_id)) is not None
for msg in job_log.messages:
msgs.append(msg)
class MockQueue:
async def put(self, job_log: JobLog):
job_id = job_log.job_id
assert (msgs := collected_logs.get(job_id)) is not None
for msg in job_log.messages:
msgs.append(msg)

queue = MockQueue()
published_logs: dict[JobID, list[str]] = {id_: [] for id_ in job_ids}

async def _log_publisher():
Expand All @@ -291,7 +302,7 @@ async def _log_publisher():
published_logs[job_id].append(msg)

for job_id in job_ids:
await log_distributor.register(job_id, callback)
await log_distributor.register(job_id, queue) # type: ignore
publisher_task = asyncio.create_task(_log_publisher())
await asyncio.gather(publisher_task)
await asyncio.sleep(0.5)
Expand Down
Loading