Skip to content

Commit

Permalink
Startup MetricsEventSubscriber as part of proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinavsingh committed Aug 11, 2024
1 parent faea962 commit 4e0bcfe
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 24 deletions.
36 changes: 21 additions & 15 deletions proxy/core/event/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def incr_counter(self, name: str, by: float = 1.0) -> None:

def _incr_counter(self, name: str, by: float = 1.0) -> None:
current = self._get_counter(name)
path = os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, f"{name}.counter")
path = os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, f'{name}.counter')
Path(path).write_text(str(current + by), encoding='utf-8')

Check warning on line 43 in proxy/core/event/metrics.py

View check run for this annotation

Codecov / codecov/patch

proxy/core/event/metrics.py#L41-L43

Added lines #L41 - L43 were not covered by tests

def get_gauge(self, name: str) -> float:
Expand All @@ -58,7 +58,7 @@ def set_gauge(self, name: str, value: float) -> None:
self._set_gauge(name, value)

Check warning on line 58 in proxy/core/event/metrics.py

View check run for this annotation

Codecov / codecov/patch

proxy/core/event/metrics.py#L58

Added line #L58 was not covered by tests

def _set_gauge(self, name: str, value: float) -> None:
path = os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, f"{name}.gauge")
path = os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, f'{name}.gauge')

Check warning on line 61 in proxy/core/event/metrics.py

View check run for this annotation

Codecov / codecov/patch

proxy/core/event/metrics.py#L61

Added line #L61 was not covered by tests
with open(path, 'w', encoding='utf-8') as g:
g.write(str(value))

Check warning on line 63 in proxy/core/event/metrics.py

View check run for this annotation

Codecov / codecov/patch

proxy/core/event/metrics.py#L63

Added line #L63 was not covered by tests

Expand All @@ -77,24 +77,19 @@ def __init__(self, event_queue: EventQueue, metrics_lock: Lock) -> None:
callback=lambda event: MetricsEventSubscriber.callback(self.storage, event),
)

def _setup_metrics_directory(self) -> None:
os.makedirs(DEFAULT_METRICS_DIRECTORY_PATH, exist_ok=True)
patterns = ['*.counter', '*.gauge']
for pattern in patterns:
files = glob.glob(os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, pattern))
for file_path in files:
try:
os.remove(file_path)
except OSError as e:
print(f'Error deleting file {file_path}: {e}')

def __enter__(self) -> 'MetricsEventSubscriber':
def setup(self) -> None:
self._setup_metrics_directory()
self.subscriber.setup()

def shutdown(self) -> None:
self.subscriber.shutdown()

def __enter__(self) -> 'MetricsEventSubscriber':
self.setup()
return self

Check warning on line 89 in proxy/core/event/metrics.py

View check run for this annotation

Codecov / codecov/patch

proxy/core/event/metrics.py#L88-L89

Added lines #L88 - L89 were not covered by tests

def __exit__(self, *args: Any) -> None:
self.subscriber.shutdown()
self.shutdown()

Check warning on line 92 in proxy/core/event/metrics.py

View check run for this annotation

Codecov / codecov/patch

proxy/core/event/metrics.py#L92

Added line #L92 was not covered by tests

@staticmethod
def callback(storage: MetricsStorage, event: Dict[str, Any]) -> None:
Expand All @@ -106,3 +101,14 @@ def callback(storage: MetricsStorage, event: Dict[str, Any]) -> None:
storage.incr_counter('work_finished')

Check warning on line 101 in proxy/core/event/metrics.py

View check run for this annotation

Codecov / codecov/patch

proxy/core/event/metrics.py#L101

Added line #L101 was not covered by tests
else:
print('Unhandled', event)

Check warning on line 103 in proxy/core/event/metrics.py

View check run for this annotation

Codecov / codecov/patch

proxy/core/event/metrics.py#L103

Added line #L103 was not covered by tests

def _setup_metrics_directory(self) -> None:
os.makedirs(DEFAULT_METRICS_DIRECTORY_PATH, exist_ok=True)
patterns = ['*.counter', '*.gauge']
for pattern in patterns:
files = glob.glob(os.path.join(DEFAULT_METRICS_DIRECTORY_PATH, pattern))
for file_path in files:
try:
os.remove(file_path)
except OSError as e:
print(f'Error deleting file {file_path}: {e}')

Check warning on line 114 in proxy/core/event/metrics.py

View check run for this annotation

Codecov / codecov/patch

proxy/core/event/metrics.py#L111-L114

Added lines #L111 - L114 were not covered by tests
19 changes: 10 additions & 9 deletions proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ def __init__(self, input_args: Optional[List[str]] = None, **opts: Any) -> None:
self.acceptors: Optional[AcceptorPool] = None
self.event_manager: Optional[EventManager] = None
self.ssh_tunnel_listener: Optional[BaseSshTunnelListener] = None
self.metrics_subscriber: Optional[MetricsEventSubscriber] = None

def __enter__(self) -> 'Proxy':
self.setup()
Expand All @@ -223,9 +224,6 @@ def setup(self) -> None:
# TODO: Python shell within running proxy.py environment
# https://github.com/abhinavsingh/proxy.py/discussions/1021
#
# TODO: Near realtime resource / stats monitoring
# https://github.com/abhinavsingh/proxy.py/discussions/1023
#
self._write_pid_file()
# We setup listeners first because of flags.port override
# in case of ephemeral port being used
Expand Down Expand Up @@ -288,6 +286,12 @@ def setup(self) -> None:
flags=self.flags,
**self.opts,
)
if event_queue is not None and self.flags.enable_metrics:
self.metrics_subscriber = MetricsEventSubscriber(
event_queue,
self.flags.metrics_lock,
)
self.metrics_subscriber.setup()
# TODO: May be close listener fd as we don't need it now
if threading.current_thread() == threading.main_thread():
self._register_signals()
Expand All @@ -310,6 +314,8 @@ def _setup_tunnel(
return tunnel

def shutdown(self) -> None:
if self.metrics_subscriber is not None:
self.metrics_subscriber.shutdown()
if self.flags.enable_ssh_tunnel:
assert self.ssh_tunnel_listener is not None
self.ssh_tunnel_listener.shutdown()
Expand Down Expand Up @@ -389,12 +395,7 @@ def sleep_loop(p: Optional[Proxy] = None) -> None:

def main(**opts: Any) -> None:
with Proxy(sys.argv[1:], **opts) as p:
event_queue = p.event_manager.queue if p.event_manager is not None else None
if event_queue is not None and p.flags.enable_metrics:
with MetricsEventSubscriber(event_queue, p.flags.metrics_lock):
sleep_loop(p)
else:
sleep_loop(p)
sleep_loop(p)


def entry_point() -> None:
Expand Down

0 comments on commit 4e0bcfe

Please sign in to comment.