diff --git a/proxy/core/event/metrics.py b/proxy/core/event/metrics.py index ed2631a988..30a8b21a22 100644 --- a/proxy/core/event/metrics.py +++ b/proxy/core/event/metrics.py @@ -39,7 +39,7 @@ def incr_counter(self, name: str, by: float = 1.0) -> None: def _incr_counter(self, name: str, by: float = 1.0) -> None: current = self._get_counter(name) - path = os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, f"{name}.counter") + path = os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, f'{name}.counter') Path(path).write_text(str(current + by), encoding='utf-8') def get_gauge(self, name: str) -> float: @@ -58,7 +58,7 @@ def set_gauge(self, name: str, value: float) -> None: self._set_gauge(name, value) def _set_gauge(self, name: str, value: float) -> None: - path = os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, f"{name}.gauge") + path = os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, f'{name}.gauge') with open(path, 'w', encoding='utf-8') as g: g.write(str(value)) @@ -77,24 +77,19 @@ def __init__(self, event_queue: EventQueue, metrics_lock: Lock) -> None: callback=lambda event: MetricsEventSubscriber.callback(self.storage, event), ) - def _setup_metrics_directory(self) -> None: - os.makedirs(DEFAULT_METRICS_DIRECTORY_PATH, exist_ok=True) - patterns = ['*.counter', '*.gauge'] - for pattern in patterns: - files = glob.glob(os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, pattern)) - for file_path in files: - try: - os.remove(file_path) - except OSError as e: - print(f'Error deleting file {file_path}: {e}') - - def __enter__(self) -> 'MetricsEventSubscriber': + def setup(self) -> None: self._setup_metrics_directory() self.subscriber.setup() + + def shutdown(self) -> None: + self.subscriber.shutdown() + + def __enter__(self) -> 'MetricsEventSubscriber': + self.setup() return self def __exit__(self, *args: Any) -> None: - self.subscriber.shutdown() + self.shutdown() @staticmethod def callback(storage: MetricsStorage, event: Dict[str, Any]) -> None: @@ -106,3 +101,14 @@ def callback(storage: MetricsStorage, event: Dict[str, Any]) -> None: storage.incr_counter('work_finished') else: print('Unhandled', event) + + def _setup_metrics_directory(self) -> None: + os.makedirs(DEFAULT_METRICS_DIRECTORY_PATH, exist_ok=True) + patterns = ['*.counter', '*.gauge'] + for pattern in patterns: + files = glob.glob(os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, pattern)) + for file_path in files: + try: + os.remove(file_path) + except OSError as e: + print(f'Error deleting file {file_path}: {e}') diff --git a/proxy/proxy.py b/proxy/proxy.py index 188cbd248f..3dbbc9377d 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -205,6 +205,7 @@ def __init__(self, input_args: Optional[List[str]] = None, **opts: Any) -> None: self.acceptors: Optional[AcceptorPool] = None self.event_manager: Optional[EventManager] = None self.ssh_tunnel_listener: Optional[BaseSshTunnelListener] = None + self.metrics_subscriber: Optional[MetricsEventSubscriber] = None def __enter__(self) -> 'Proxy': self.setup() @@ -223,9 +224,6 @@ def setup(self) -> None: # TODO: Python shell within running proxy.py environment # https://github.com/abhinavsingh/proxy.py/discussions/1021 # - # TODO: Near realtime resource / stats monitoring - # https://github.com/abhinavsingh/proxy.py/discussions/1023 - # self._write_pid_file() # We setup listeners first because of flags.port override # in case of ephemeral port being used @@ -288,6 +286,12 @@ def setup(self) -> None: flags=self.flags, **self.opts, ) + if event_queue is not None and self.flags.enable_metrics: + self.metrics_subscriber = MetricsEventSubscriber( + event_queue, + self.flags.metrics_lock, + ) + self.metrics_subscriber.setup() # TODO: May be close listener fd as we don't need it now if threading.current_thread() == threading.main_thread(): self._register_signals() @@ -310,6 +314,8 @@ def _setup_tunnel( return tunnel def shutdown(self) -> None: + if self.metrics_subscriber is not None: + self.metrics_subscriber.shutdown() if self.flags.enable_ssh_tunnel: assert self.ssh_tunnel_listener is not None self.ssh_tunnel_listener.shutdown() @@ -389,12 +395,7 @@ def sleep_loop(p: Optional[Proxy] = None) -> None: def main(**opts: Any) -> None: with Proxy(sys.argv[1:], **opts) as p: - event_queue = p.event_manager.queue if p.event_manager is not None else None - if event_queue is not None and p.flags.enable_metrics: - with MetricsEventSubscriber(event_queue, p.flags.metrics_lock): - sleep_loop(p) - else: - sleep_loop(p) + sleep_loop(p) def entry_point() -> None: