Skip to content

Commit

Permalink
Adds metrics for client-side event emission and subscription
Browse files Browse the repository at this point in the history
Following the addition of the `prometheus_client` in #14783, this adds counters
for measuring the performance of Prefect event emissions and subscription.

A note about unit tests: I don't traditionally add unit tests for
"leaf-level" instrumentation like this.  Leaf-level here means it is measuring
something about the system and doesn't form part of a measurement API (like if
there were middleware for measuring HTTP latency, for example). Unless it is
particularly complex to calculate, I generally skip extra unit tests and let the
standard test suite and coverage inform me if the instrumentation might be a
problem or not executed.
  • Loading branch information
chrisguidry committed Jul 29, 2024
1 parent 9a4c901 commit 79fed45
Showing 1 changed file with 54 additions and 4 deletions.
58 changes: 54 additions & 4 deletions src/prefect/events/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import orjson
import pendulum
from cachetools import TTLCache
from prometheus_client import Counter
from typing_extensions import Self
from websockets import Subprotocol
from websockets.client import WebSocketClientProtocol, connect
Expand All @@ -36,6 +37,30 @@
if TYPE_CHECKING:
from prefect.events.filters import EventFilter

EVENTS_EMITTED = Counter(
"prefect_events_emitted",
"The number of events emitted by Prefect event clients",
labelnames=["client"],
)
EVENTS_OBSERVED = Counter(
"prefect_events_observed",
"The number of events observed by Prefect event subscribers",
labelnames=["client"],
)
EVENT_WEBSOCKET_CONNECTIONS = Counter(
"prefect_event_websocket_connections",
(
"The number of times Prefect event clients have connected to an event stream, "
"broken down by direction (in/out) and connection (initial/reconnect)"
),
labelnames=["client", "direction", "connection"],
)
EVENT_WEBSOCKET_CHECKPOINTS = Counter(
"prefect_event_websocket_checkpoints",
"The number of checkpoints performed by Prefect event clients",
labelnames=["client"],
)

logger = get_logger(__name__)


Expand Down Expand Up @@ -82,14 +107,22 @@ def get_events_subscriber(
class EventsClient(abc.ABC):
"""The abstract interface for all Prefect Events clients"""

@property
def client_name(self) -> str:
return self.__class__.__name__

async def emit(self, event: Event) -> None:
"""Emit a single event"""
if not hasattr(self, "_in_context"):
raise TypeError(
"Events may only be emitted while this client is being used as a "
"context manager"
)
return await self._emit(event)

try:
return await self._emit(event)
finally:
EVENTS_EMITTED.labels(self.client_name).inc()

@abc.abstractmethod
async def _emit(self, event: Event) -> None: # pragma: no cover
Expand Down Expand Up @@ -299,6 +332,8 @@ async def _checkpoint(self, event: Event) -> None:
# don't clear the list, just the ones that we are sure of.
self._unconfirmed_events = self._unconfirmed_events[unconfirmed_count:]

EVENT_WEBSOCKET_CHECKPOINTS.labels(self.client_name).inc()

async def _emit(self, event: Event) -> None:
for i in range(self._reconnection_attempts + 1):
try:
Expand Down Expand Up @@ -426,10 +461,17 @@ def __init__(
if self._reconnection_attempts < 0:
raise ValueError("reconnection_attempts must be a non-negative integer")

@property
def client_name(self) -> str:
return self.__class__.__name__

async def __aenter__(self) -> Self:
# Don't handle any errors in the initial connection, because these are most
# likely a permission or configuration issue that should propagate
await self._reconnect()
try:
await self._reconnect()
finally:
EVENT_WEBSOCKET_CONNECTIONS.labels(self.client_name, "out", "initial")
return self

async def _reconnect(self) -> None:
Expand Down Expand Up @@ -503,7 +545,12 @@ async def __anext__(self) -> Event:
# Otherwise, after the first time through this loop, we're recovering
# from a ConnectionClosed, so reconnect now.
if not self._websocket or i > 0:
await self._reconnect()
try:
await self._reconnect()
finally:
EVENT_WEBSOCKET_CONNECTIONS.labels(
self.client_name, "out", "reconnect"
)
assert self._websocket

while True:
Expand All @@ -514,7 +561,10 @@ async def __anext__(self) -> Event:
continue
self._seen_events[event.id] = True

return event
try:
return event
finally:
EVENTS_OBSERVED.labels(self.client_name).inc()
except ConnectionClosedOK:
logger.debug('Connection closed with "OK" status')
raise StopAsyncIteration
Expand Down

0 comments on commit 79fed45

Please sign in to comment.