From 11b46ad6df2159bb895092ffb41cec641f059154 Mon Sep 17 00:00:00 2001 From: ric-evans Date: Tue, 15 Oct 2024 18:44:27 -0500 Subject: [PATCH] light refactor reporter.py --- skymap_scanner/server/collector.py | 11 +-- skymap_scanner/server/reporter.py | 148 +++++++++++++++++++---------- 2 files changed, 103 insertions(+), 56 deletions(-) diff --git a/skymap_scanner/server/collector.py b/skymap_scanner/server/collector.py index c0960da22..93efff8ea 100644 --- a/skymap_scanner/server/collector.py +++ b/skymap_scanner/server/collector.py @@ -1,6 +1,5 @@ """The Skymap Scanner Server.""" - import itertools import logging import time @@ -9,6 +8,7 @@ import numpy +from .reporter import Reporter from .. import config as cfg from ..utils.pixel_classes import ( NSidesDict, @@ -17,7 +17,6 @@ RecoPixelVariation, SentPixelVariation, ) -from .reporter import Reporter LOGGER = logging.getLogger(__name__) @@ -218,7 +217,7 @@ async def register_sent_pixvars( async def collect( self, reco_pixel_variation: RecoPixelVariation, - reco_runtime: float, + on_worker_runtime: float, ) -> None: """Cache RecoPixelVariation until we can save the pixel's best received reco (RecoPixelFinal).""" @@ -276,9 +275,9 @@ async def collect( # report after potential save await self.reporter.record_reco( reco_pixel_variation.nside, - reco_runtime, - roundtrip_start=sent_pixvar.sent_time, - roundtrip_end=time.time(), + on_worker_runtime, + on_server_roundtrip_start=sent_pixvar.sent_time, + on_server_roundtrip_end=time.time(), ) def has_collected_all_sent(self) -> bool: diff --git a/skymap_scanner/server/reporter.py b/skymap_scanner/server/reporter.py index 6ab37496a..539a41572 100644 --- a/skymap_scanner/server/reporter.py +++ b/skymap_scanner/server/reporter.py @@ -9,6 +9,7 @@ import math import statistics import time +from collections import deque from pathlib import Path from typing import Any, Callable, Dict, List, Optional @@ -39,35 +40,39 @@ def __init__( self, worker_runtimes: Optional[List[float]] = None, roundtrips: Optional[List[float]] = None, - roundtrip_start: float = float("inf"), - roundtrip_end: float = float("-inf"), + on_server_roundtrip_start: float = float("inf"), + on_server_roundtrip_end: float = float("-inf"), ends: Optional[List[float]] = None, ) -> None: - self.roundtrip_start = roundtrip_start - self.roundtrip_end = roundtrip_end + self.on_server_roundtrip_start = on_server_roundtrip_start + self.on_server_roundtrip_end = on_server_roundtrip_end - self.worker_runtimes: List[float] = worker_runtimes if worker_runtimes else [] - self.worker_runtimes.sort() # speed up stats + self.on_worker_runtimes: List[float] = ( + worker_runtimes if worker_runtimes else [] + ) + self.on_worker_runtimes.sort() # speed up stats # - self.roundtrips: List[float] = roundtrips if roundtrips else [] - self.roundtrips.sort() # speed up stats + self.on_server_roundtrips: List[float] = roundtrips if roundtrips else [] + self.on_server_roundtrips.sort() # speed up stats # self.ends: List[float] = ends if ends else [] self.ends.sort() # speed up stats - self.fastest_worker = lambda: min(self.worker_runtimes) - self.fastest_roundtrip = lambda: min(self.roundtrips) + self.fastest_worker = lambda: min(self.on_worker_runtimes) + self.fastest_roundtrip = lambda: min(self.on_server_roundtrips) - self.slowest_worker = lambda: max(self.worker_runtimes) - self.slowest_roundtrip = lambda: max(self.roundtrips) + self.slowest_worker = lambda: max(self.on_worker_runtimes) + self.slowest_roundtrip = lambda: max(self.on_server_roundtrips) # Fast, floating point arithmetic mean. - self.mean_worker = lambda: statistics.fmean(self.worker_runtimes) - self.mean_roundtrip = lambda: statistics.fmean(self.roundtrips) + self.mean_worker = lambda: statistics.fmean(self.on_worker_runtimes) + self.mean_roundtrip = lambda: statistics.fmean(self.on_server_roundtrips) # Median (middle value) of data. - self.median_worker = lambda: float(statistics.median(self.worker_runtimes)) - self.median_roundtrip = lambda: float(statistics.median(self.roundtrips)) + self.median_worker = lambda: float(statistics.median(self.on_worker_runtimes)) + self.median_roundtrip = lambda: float( + statistics.median(self.on_server_roundtrips) + ) # other statistics functions... # geometric_mean Geometric mean of data. @@ -83,16 +88,29 @@ def __init__( def update( self, - worker_runtime: float, - roundtrip_start: float, - roundtrip_end: float, + on_worker_runtime: float, + on_server_roundtrip_start: float, + on_server_roundtrip_end: float, ) -> "WorkerStats": """Insert the runtime and recalculate round-trip start/end times.""" - bisect.insort(self.worker_runtimes, worker_runtime) - bisect.insort(self.roundtrips, roundtrip_end - roundtrip_start) - bisect.insort(self.ends, roundtrip_end) - self.roundtrip_start = min(self.roundtrip_start, roundtrip_start) - self.roundtrip_end = max(self.roundtrip_end, roundtrip_end) + bisect.insort( + self.on_worker_runtimes, + on_worker_runtime, + ) + bisect.insort( + self.on_server_roundtrips, + on_server_roundtrip_end - on_server_roundtrip_start, + ) + bisect.insort( + self.ends, + on_server_roundtrip_end, + ) + self.on_server_roundtrip_start = min( + self.on_server_roundtrip_start, on_server_roundtrip_start + ) + self.on_server_roundtrip_end = max( + self.on_server_roundtrip_end, on_server_roundtrip_end + ) return self def get_summary(self) -> Dict[str, Dict[str, str]]: @@ -128,16 +146,28 @@ def get_summary(self) -> Dict[str, Dict[str, str]]: ), }, "wall time": { - "start": str(dt.datetime.fromtimestamp(int(self.roundtrip_start))), - "end": str(dt.datetime.fromtimestamp(int(self.roundtrip_end))), + "start": str( + dt.datetime.fromtimestamp(int(self.on_server_roundtrip_start)) + ), + "end": str( + dt.datetime.fromtimestamp(int(self.on_server_roundtrip_end)) + ), "runtime": str( - dt.timedelta(seconds=int(self.roundtrip_end - self.roundtrip_start)) + dt.timedelta( + seconds=int( + self.on_server_roundtrip_end + - self.on_server_roundtrip_start + ) + ) ), "mean reco": str( dt.timedelta( seconds=int( - (self.roundtrip_end - self.roundtrip_start) - / len(self.worker_runtimes) + ( + self.on_server_roundtrip_end + - self.on_server_roundtrip_start + ) + / len(self.on_worker_runtimes) ) ) ), @@ -151,30 +181,35 @@ class WorkerStatsCollection: def __init__(self) -> None: self._worker_stats_by_nside: Dict[int, WorkerStats] = {} self._aggregate: Optional[WorkerStats] = None + self.recent_runtimes = deque(maxlen=100) def ct_by_nside(self, nside: int) -> int: """Get length per given nside.""" try: - return len(self._worker_stats_by_nside[nside].worker_runtimes) + return len(self._worker_stats_by_nside[nside].on_worker_runtimes) except KeyError: return 0 @property def total_ct(self) -> int: """O(n) b/c len is O(1), n < 10.""" - return sum(len(w.worker_runtimes) for w in self._worker_stats_by_nside.values()) + return sum( + len(w.on_worker_runtimes) for w in self._worker_stats_by_nside.values() + ) @property def first_roundtrip_start(self) -> float: """O(n), n < 10.""" - return min(w.roundtrip_start for w in self._worker_stats_by_nside.values()) + return min( + w.on_server_roundtrip_start for w in self._worker_stats_by_nside.values() + ) def update( self, nside: int, - runtime: float, - roundtrip_start: float, - roundtrip_end: float, + on_worker_runtime: float, + on_server_roundtrip_start: float, + on_server_roundtrip_end: float, ) -> int: """Return reco-count of nside's list after updating.""" self._aggregate = None # clear @@ -182,23 +217,34 @@ def update( worker_stats = self._worker_stats_by_nside[nside] except KeyError: worker_stats = self._worker_stats_by_nside[nside] = WorkerStats() - worker_stats.update(runtime, roundtrip_start, roundtrip_end) - return len(worker_stats.worker_runtimes) + worker_stats.update( + on_worker_runtime, + on_server_roundtrip_start, + on_server_roundtrip_end, + ) + self.recent_runtimes.append(on_worker_runtime) + return len(worker_stats.on_worker_runtimes) @property def aggregate(self) -> WorkerStats: - """Cached `WorkerStats` aggregate.""" + """An aggregate (`WorkerStats` obj) of all recos (all nsides).""" if not self._aggregate: instances = self._worker_stats_by_nside.values() if not instances: return WorkerStats() self._aggregate = WorkerStats( worker_runtimes=list( - itertools.chain(*[i.worker_runtimes for i in instances]) + itertools.chain(*[i.on_worker_runtimes for i in instances]) + ), + roundtrips=list( + itertools.chain(*[i.on_server_roundtrips for i in instances]) + ), + on_server_roundtrip_start=min( + i.on_server_roundtrip_start for i in instances + ), + on_server_roundtrip_end=max( + i.on_server_roundtrip_end for i in instances ), - roundtrips=list(itertools.chain(*[i.roundtrips for i in instances])), - roundtrip_start=min(i.roundtrip_start for i in instances), - roundtrip_end=max(i.roundtrip_end for i in instances), ends=list(itertools.chain(*[i.ends for i in instances])), ) return self._aggregate @@ -309,26 +355,28 @@ async def precomputing_report(self) -> None: async def record_reco( self, nside: int, - runtime: float, - roundtrip_start: float, - roundtrip_end: float, + on_worker_runtime: float, + on_server_roundtrip_start: float, + on_server_roundtrip_end: float, ) -> None: """Send reports/logs/plots if needed.""" self._check_call_order(self.record_reco) if not self.time_of_first_reco_start_on_client: - # timeline: roundtrip_start -> pre-reco queue time -> (runtime) -> post-reco queue time -> roundtrip_end + # timeline: on_server_roundtrip_start -> pre-reco queue time -> (runtime) -> post-reco queue time -> on_server_roundtrip_end # since worker nodes need to startup & a pixel may fail several times before being reco'd, # we know "pre-reco queue time" >>> "post-reco queue time" # if we assume "post-reco queue time" ~= 0.0, then the reco started here: - self.time_of_first_reco_start_on_client = roundtrip_end - (runtime + 0.0) + self.time_of_first_reco_start_on_client = on_server_roundtrip_end - ( + on_worker_runtime + 0.0 + ) # update stats nside_ct = self.worker_stats_collection.update( nside, - runtime, - roundtrip_start, - roundtrip_end, + on_worker_runtime, + on_server_roundtrip_start, + on_server_roundtrip_end, ) # make report(s)