Skip to content

Commit

Permalink
light refactor reporter.py
Browse files Browse the repository at this point in the history
  • Loading branch information
ric-evans committed Oct 15, 2024
1 parent 451c832 commit 11b46ad
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 56 deletions.
11 changes: 5 additions & 6 deletions skymap_scanner/server/collector.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""The Skymap Scanner Server."""


import itertools
import logging
import time
Expand All @@ -9,6 +8,7 @@

import numpy

from .reporter import Reporter
from .. import config as cfg
from ..utils.pixel_classes import (
NSidesDict,
Expand All @@ -17,7 +17,6 @@
RecoPixelVariation,
SentPixelVariation,
)
from .reporter import Reporter

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -218,7 +217,7 @@ async def register_sent_pixvars(
async def collect(
self,
reco_pixel_variation: RecoPixelVariation,
reco_runtime: float,
on_worker_runtime: float,
) -> None:
"""Cache RecoPixelVariation until we can save the pixel's best received
reco (RecoPixelFinal)."""
Expand Down Expand Up @@ -276,9 +275,9 @@ async def collect(
# report after potential save
await self.reporter.record_reco(
reco_pixel_variation.nside,
reco_runtime,
roundtrip_start=sent_pixvar.sent_time,
roundtrip_end=time.time(),
on_worker_runtime,
on_server_roundtrip_start=sent_pixvar.sent_time,
on_server_roundtrip_end=time.time(),
)

def has_collected_all_sent(self) -> bool:
Expand Down
148 changes: 98 additions & 50 deletions skymap_scanner/server/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import math
import statistics
import time
from collections import deque
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional

Expand Down Expand Up @@ -39,35 +40,39 @@ def __init__(
self,
worker_runtimes: Optional[List[float]] = None,
roundtrips: Optional[List[float]] = None,
roundtrip_start: float = float("inf"),
roundtrip_end: float = float("-inf"),
on_server_roundtrip_start: float = float("inf"),
on_server_roundtrip_end: float = float("-inf"),
ends: Optional[List[float]] = None,
) -> None:
self.roundtrip_start = roundtrip_start
self.roundtrip_end = roundtrip_end
self.on_server_roundtrip_start = on_server_roundtrip_start
self.on_server_roundtrip_end = on_server_roundtrip_end

self.worker_runtimes: List[float] = worker_runtimes if worker_runtimes else []
self.worker_runtimes.sort() # speed up stats
self.on_worker_runtimes: List[float] = (
worker_runtimes if worker_runtimes else []
)
self.on_worker_runtimes.sort() # speed up stats
#
self.roundtrips: List[float] = roundtrips if roundtrips else []
self.roundtrips.sort() # speed up stats
self.on_server_roundtrips: List[float] = roundtrips if roundtrips else []
self.on_server_roundtrips.sort() # speed up stats
#
self.ends: List[float] = ends if ends else []
self.ends.sort() # speed up stats

self.fastest_worker = lambda: min(self.worker_runtimes)
self.fastest_roundtrip = lambda: min(self.roundtrips)
self.fastest_worker = lambda: min(self.on_worker_runtimes)
self.fastest_roundtrip = lambda: min(self.on_server_roundtrips)

self.slowest_worker = lambda: max(self.worker_runtimes)
self.slowest_roundtrip = lambda: max(self.roundtrips)
self.slowest_worker = lambda: max(self.on_worker_runtimes)
self.slowest_roundtrip = lambda: max(self.on_server_roundtrips)

# Fast, floating point arithmetic mean.
self.mean_worker = lambda: statistics.fmean(self.worker_runtimes)
self.mean_roundtrip = lambda: statistics.fmean(self.roundtrips)
self.mean_worker = lambda: statistics.fmean(self.on_worker_runtimes)
self.mean_roundtrip = lambda: statistics.fmean(self.on_server_roundtrips)

# Median (middle value) of data.
self.median_worker = lambda: float(statistics.median(self.worker_runtimes))
self.median_roundtrip = lambda: float(statistics.median(self.roundtrips))
self.median_worker = lambda: float(statistics.median(self.on_worker_runtimes))
self.median_roundtrip = lambda: float(
statistics.median(self.on_server_roundtrips)
)

# other statistics functions...
# geometric_mean Geometric mean of data.
Expand All @@ -83,16 +88,29 @@ def __init__(

def update(
self,
worker_runtime: float,
roundtrip_start: float,
roundtrip_end: float,
on_worker_runtime: float,
on_server_roundtrip_start: float,
on_server_roundtrip_end: float,
) -> "WorkerStats":
"""Insert the runtime and recalculate round-trip start/end times."""
bisect.insort(self.worker_runtimes, worker_runtime)
bisect.insort(self.roundtrips, roundtrip_end - roundtrip_start)
bisect.insort(self.ends, roundtrip_end)
self.roundtrip_start = min(self.roundtrip_start, roundtrip_start)
self.roundtrip_end = max(self.roundtrip_end, roundtrip_end)
bisect.insort(
self.on_worker_runtimes,
on_worker_runtime,
)
bisect.insort(
self.on_server_roundtrips,
on_server_roundtrip_end - on_server_roundtrip_start,
)
bisect.insort(
self.ends,
on_server_roundtrip_end,
)
self.on_server_roundtrip_start = min(
self.on_server_roundtrip_start, on_server_roundtrip_start
)
self.on_server_roundtrip_end = max(
self.on_server_roundtrip_end, on_server_roundtrip_end
)
return self

def get_summary(self) -> Dict[str, Dict[str, str]]:
Expand Down Expand Up @@ -128,16 +146,28 @@ def get_summary(self) -> Dict[str, Dict[str, str]]:
),
},
"wall time": {
"start": str(dt.datetime.fromtimestamp(int(self.roundtrip_start))),
"end": str(dt.datetime.fromtimestamp(int(self.roundtrip_end))),
"start": str(
dt.datetime.fromtimestamp(int(self.on_server_roundtrip_start))
),
"end": str(
dt.datetime.fromtimestamp(int(self.on_server_roundtrip_end))
),
"runtime": str(
dt.timedelta(seconds=int(self.roundtrip_end - self.roundtrip_start))
dt.timedelta(
seconds=int(
self.on_server_roundtrip_end
- self.on_server_roundtrip_start
)
)
),
"mean reco": str(
dt.timedelta(
seconds=int(
(self.roundtrip_end - self.roundtrip_start)
/ len(self.worker_runtimes)
(
self.on_server_roundtrip_end
- self.on_server_roundtrip_start
)
/ len(self.on_worker_runtimes)
)
)
),
Expand All @@ -151,54 +181,70 @@ class WorkerStatsCollection:
def __init__(self) -> None:
self._worker_stats_by_nside: Dict[int, WorkerStats] = {}
self._aggregate: Optional[WorkerStats] = None
self.recent_runtimes = deque(maxlen=100)

def ct_by_nside(self, nside: int) -> int:
"""Get length per given nside."""
try:
return len(self._worker_stats_by_nside[nside].worker_runtimes)
return len(self._worker_stats_by_nside[nside].on_worker_runtimes)
except KeyError:
return 0

@property
def total_ct(self) -> int:
"""O(n) b/c len is O(1), n < 10."""
return sum(len(w.worker_runtimes) for w in self._worker_stats_by_nside.values())
return sum(
len(w.on_worker_runtimes) for w in self._worker_stats_by_nside.values()
)

@property
def first_roundtrip_start(self) -> float:
"""O(n), n < 10."""
return min(w.roundtrip_start for w in self._worker_stats_by_nside.values())
return min(
w.on_server_roundtrip_start for w in self._worker_stats_by_nside.values()
)

def update(
self,
nside: int,
runtime: float,
roundtrip_start: float,
roundtrip_end: float,
on_worker_runtime: float,
on_server_roundtrip_start: float,
on_server_roundtrip_end: float,
) -> int:
"""Return reco-count of nside's list after updating."""
self._aggregate = None # clear
try:
worker_stats = self._worker_stats_by_nside[nside]
except KeyError:
worker_stats = self._worker_stats_by_nside[nside] = WorkerStats()
worker_stats.update(runtime, roundtrip_start, roundtrip_end)
return len(worker_stats.worker_runtimes)
worker_stats.update(
on_worker_runtime,
on_server_roundtrip_start,
on_server_roundtrip_end,
)
self.recent_runtimes.append(on_worker_runtime)
return len(worker_stats.on_worker_runtimes)

@property
def aggregate(self) -> WorkerStats:
"""Cached `WorkerStats` aggregate."""
"""An aggregate (`WorkerStats` obj) of all recos (all nsides)."""
if not self._aggregate:
instances = self._worker_stats_by_nside.values()
if not instances:
return WorkerStats()
self._aggregate = WorkerStats(
worker_runtimes=list(
itertools.chain(*[i.worker_runtimes for i in instances])
itertools.chain(*[i.on_worker_runtimes for i in instances])
),
roundtrips=list(
itertools.chain(*[i.on_server_roundtrips for i in instances])
),
on_server_roundtrip_start=min(
i.on_server_roundtrip_start for i in instances
),
on_server_roundtrip_end=max(
i.on_server_roundtrip_end for i in instances
),
roundtrips=list(itertools.chain(*[i.roundtrips for i in instances])),
roundtrip_start=min(i.roundtrip_start for i in instances),
roundtrip_end=max(i.roundtrip_end for i in instances),
ends=list(itertools.chain(*[i.ends for i in instances])),
)
return self._aggregate
Expand Down Expand Up @@ -309,26 +355,28 @@ async def precomputing_report(self) -> None:
async def record_reco(
self,
nside: int,
runtime: float,
roundtrip_start: float,
roundtrip_end: float,
on_worker_runtime: float,
on_server_roundtrip_start: float,
on_server_roundtrip_end: float,
) -> None:
"""Send reports/logs/plots if needed."""
self._check_call_order(self.record_reco)

if not self.time_of_first_reco_start_on_client:
# timeline: roundtrip_start -> pre-reco queue time -> (runtime) -> post-reco queue time -> roundtrip_end
# timeline: on_server_roundtrip_start -> pre-reco queue time -> (runtime) -> post-reco queue time -> on_server_roundtrip_end
# since worker nodes need to startup & a pixel may fail several times before being reco'd,
# we know "pre-reco queue time" >>> "post-reco queue time"
# if we assume "post-reco queue time" ~= 0.0, then the reco started here:
self.time_of_first_reco_start_on_client = roundtrip_end - (runtime + 0.0)
self.time_of_first_reco_start_on_client = on_server_roundtrip_end - (
on_worker_runtime + 0.0
)

# update stats
nside_ct = self.worker_stats_collection.update(
nside,
runtime,
roundtrip_start,
roundtrip_end,
on_worker_runtime,
on_server_roundtrip_start,
on_server_roundtrip_end,
)

# make report(s)
Expand Down

0 comments on commit 11b46ad

Please sign in to comment.