diff --git a/src/prefect/events/clients.py b/src/prefect/events/clients.py index 70a9bf38861e..a67dac3ef0d7 100644 --- a/src/prefect/events/clients.py +++ b/src/prefect/events/clients.py @@ -19,6 +19,7 @@ import orjson import pendulum from cachetools import TTLCache +from prometheus_client import Counter from typing_extensions import Self from websockets import Subprotocol from websockets.client import WebSocketClientProtocol, connect @@ -36,6 +37,30 @@ if TYPE_CHECKING: from prefect.events.filters import EventFilter +EVENTS_EMITTED = Counter( + "prefect_events_emitted", + "The number of events emitted by Prefect event clients", + labelnames=["client"], +) +EVENTS_OBSERVED = Counter( + "prefect_events_observed", + "The number of events observed by Prefect event subscribers", + labelnames=["client"], +) +EVENT_WEBSOCKET_CONNECTIONS = Counter( + "prefect_event_websocket_connections", + ( + "The number of times Prefect event clients have connected to an event stream, " + "broken down by direction (in/out) and connection (initial/reconnect)" + ), + labelnames=["client", "direction", "connection"], +) +EVENT_WEBSOCKET_CHECKPOINTS = Counter( + "prefect_event_websocket_checkpoints", + "The number of checkpoints performed by Prefect event clients", + labelnames=["client"], +) + logger = get_logger(__name__) @@ -82,6 +107,10 @@ def get_events_subscriber( class EventsClient(abc.ABC): """The abstract interface for all Prefect Events clients""" + @property + def client_name(self) -> str: + return self.__class__.__name__ + async def emit(self, event: Event) -> None: """Emit a single event""" if not hasattr(self, "_in_context"): @@ -89,7 +118,11 @@ async def emit(self, event: Event) -> None: "Events may only be emitted while this client is being used as a " "context manager" ) - return await self._emit(event) + + try: + return await self._emit(event) + finally: + EVENTS_EMITTED.labels(self.client_name).inc() @abc.abstractmethod async def _emit(self, event: Event) -> None: # pragma: no cover @@ -299,6 +332,8 @@ async def _checkpoint(self, event: Event) -> None: # don't clear the list, just the ones that we are sure of. self._unconfirmed_events = self._unconfirmed_events[unconfirmed_count:] + EVENT_WEBSOCKET_CHECKPOINTS.labels(self.client_name).inc() + async def _emit(self, event: Event) -> None: for i in range(self._reconnection_attempts + 1): try: @@ -426,10 +461,17 @@ def __init__( if self._reconnection_attempts < 0: raise ValueError("reconnection_attempts must be a non-negative integer") + @property + def client_name(self) -> str: + return self.__class__.__name__ + async def __aenter__(self) -> Self: # Don't handle any errors in the initial connection, because these are most # likely a permission or configuration issue that should propagate - await self._reconnect() + try: + await self._reconnect() + finally: + EVENT_WEBSOCKET_CONNECTIONS.labels(self.client_name, "out", "initial") return self async def _reconnect(self) -> None: @@ -503,7 +545,12 @@ async def __anext__(self) -> Event: # Otherwise, after the first time through this loop, we're recovering # from a ConnectionClosed, so reconnect now. if not self._websocket or i > 0: - await self._reconnect() + try: + await self._reconnect() + finally: + EVENT_WEBSOCKET_CONNECTIONS.labels( + self.client_name, "out", "reconnect" + ) assert self._websocket while True: @@ -514,7 +561,10 @@ async def __anext__(self) -> Event: continue self._seen_events[event.id] = True - return event + try: + return event + finally: + EVENTS_OBSERVED.labels(self.client_name).inc() except ConnectionClosedOK: logger.debug('Connection closed with "OK" status') raise StopAsyncIteration