From 975b6b6888125b21a251c99c80534b8b1d664298 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Sun, 11 Aug 2024 12:58:50 +0530 Subject: [PATCH] `emit_request_complete` for web servers --- proxy/http/server/metrics.py | 2 +- proxy/http/server/web.py | 40 ++++++++++++++++++++++++++++++++++-- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/proxy/http/server/metrics.py b/proxy/http/server/metrics.py index 77a9895ff5..55a306cac3 100644 --- a/proxy/http/server/metrics.py +++ b/proxy/http/server/metrics.py @@ -124,7 +124,7 @@ def collect(self) -> Generator[Metric, None, None]: request_complete = CounterMetricFamily( 'proxypy_request_complete', - 'Total requests from whom request object was successfully received', + 'Total requests that sent a request successfully', ) request_complete.add_metric( ['proxypy_request_complete'], diff --git a/proxy/http/server/web.py b/proxy/http/server/web.py index f3899e890c..cec4abbd64 100644 --- a/proxy/http/server/web.py +++ b/proxy/http/server/web.py @@ -17,11 +17,13 @@ from .plugin import HttpWebServerBasePlugin from ..parser import HttpParser, httpParserTypes from ..plugin import HttpProtocolHandlerPlugin +from ..methods import httpMethods from .protocols import httpProtocolTypes from ..exception import HttpProtocolException from ..protocols import httpProtocols from ..responses import NOT_FOUND_RESPONSE_PKT from ..websocket import WebsocketFrame, websocketOpcodes +from ...core.event import eventNames from ...common.flag import flags from ...common.types import Readables, Writables, Descriptors from ...common.utils import text_, build_websocket_handshake_response @@ -139,6 +141,7 @@ def switch_to_websocket(self) -> None: self.switched_protocol = httpProtocolTypes.WEBSOCKET def on_request_complete(self) -> Union[socket.socket, bool]: + self.emit_request_complete() path = self.request.path or b'/' teardown = self._try_route(path) if teardown: @@ -220,8 +223,8 @@ def on_response_chunk(self, chunk: List[memoryview]) -> List[memoryview]: self._response_size += sum(len(c) for c in chunk) return chunk - def on_client_connection_close(self) -> None: - context = { + def _context(self) -> Dict[str, Any]: + return { 'client_ip': None if not self.client.addr else self.client.addr[0], 'client_port': None if not self.client.addr else self.client.addr[1], 'connection_time_ms': '%.2f' % ((time.time() - self.start_time) * 1000), @@ -249,6 +252,9 @@ def on_client_connection_close(self) -> None: # 'response_code': text_(self.response.code), # 'response_reason': text_(self.response.reason), } + + def on_client_connection_close(self) -> None: + context = self._context() log_handled = False if self.route: # May be merge on_client_connection_close and on_access_log??? @@ -304,3 +310,33 @@ def _try_static_or_404(self, path: bytes) -> None: self.flags.min_compression_length, ), ) + + def emit_request_complete(self) -> None: + if not self.flags.enable_events: + return + assert self.request.port and self.event_queue + self.event_queue.publish( + request_id=self.uid, + event_name=eventNames.REQUEST_COMPLETE, + event_payload={ + 'url': 'http://%s%s' + % ( + text_(self.request.header(b'host')), + text_(self.request.path), + ), + 'method': text_(self.request.method), + 'headers': ( + {} + if not self.request.headers + else { + text_(k): text_(v[1]) for k, v in self.request.headers.items() + } + ), + 'body': ( + text_(self.request.body, errors='ignore') + if self.request.method == httpMethods.POST + else None + ), + }, + publisher_id=self.__class__.__qualname__, + )