diff --git a/src/vse_sync_pp/analyzers/analyzer.py b/src/vse_sync_pp/analyzers/analyzer.py index 84fb1fc..452521c 100644 --- a/src/vse_sync_pp/analyzers/analyzer.py +++ b/src/vse_sync_pp/analyzers/analyzer.py @@ -13,6 +13,34 @@ from ..requirements import REQUIREMENTS +def calculate_limit(accuracy, limit_percentage, tau): + """Calculate upper limit based on tau + + `accuracy` is the list of functions to calculate upper limits + `limit_percentage` is the unaccuracy percentage + `tau` is the observation window interval + + Return the upper limit value based on `tau` + """ + for (low, high), f in accuracy.items(): + if ((low is None or tau > low) and (tau <= high)): + return f(tau) * (limit_percentage / 100) + +def out_of_range(taus, samples, accuracy, limit): + """Check if the input samples are out of range. + + `taus` list of observation windows intervals + `samples` are input samples + `accuracy` contains the list of upper bound limit functions + `limit` is the percentage to apply the upper limit + + Return `True` if any value in `samples` is out of range + """ + for tau, sample in zip(taus, samples): + mask = calculate_limit(accuracy, limit, tau) + if mask <= sample: + return True + return False class Config(): """Analyzer configuration""" @@ -286,34 +314,16 @@ def __init__(self, config): self._transient = config.parameter('transient-period/s') # minimum test duration for a valid test self._duration = config.parameter('min-test-duration/s') - # limit initial tau observation windows from 1 to 9k taus - taus_below_tenkey = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90, + # limit initial tau observation windows from 1 to 10k taus + taus_below_10k = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 2000, 3000, - 4000, 5000, 6000, 7000, 8000, 10000]) + 4000, 5000, 6000, 7000, 8000, 9000, 10000]) # observation window upper bound limited to 100k samples in 5k increments - taus_above_tenkey = np.arange(10001, 100000, 5000) + taus_above_10k = np.arange(10001, 100000, 5000) # `taus_list` contains range limit of obervation window intervals for which to compute the statistic - self._taus_list = np.concatenate((taus_below_tenkey, taus_above_tenkey)) + self._taus_list = np.concatenate((taus_below_10k, taus_above_10k)) self._rate = None self._lpf_signal = None - # atributtes to be initialized in derived class - self._accuracy = None - - def calculate_limit(self, tau): - # return limit based on `tau` - if self._accuracy is None: - raise AttributeError('atributte not initialized') - for (low, high), f in self._accuracy.items(): - if ((low is None or tau > low) and (tau <= high)): - return f(tau) * (self._limit / 100) - - def out_of_range(self, taus, samples): - # return true if any value in `tdevs` is out of range - for tau, sample in zip(taus, samples): - mask = self.calculate_limit(tau) - if mask <= sample: - return True - return False def prepare(self, rows): idx = 0 @@ -353,7 +363,7 @@ def calculate_rate(self, data): cumdelta = cumdelta + data.iloc[i].timestamp - data.iloc[i - 1].timestamp return round((1 / (cumdelta / 100))) - def test_common(self, data): + def _test_common(self, data): if len(data) == 0: return (False, "no data") if frozenset(data.state.unique()).difference(self.locked): @@ -368,7 +378,7 @@ def test_common(self, data): self._lpf_signal = self.calculate_filter(data) return None - def explain_common(self, data): + def _explain_common(self, data): if len(data) == 0: return {} if self._rate is None: @@ -398,17 +408,17 @@ def __init__(self, config): self._ns = None def test(self, data): - result = super().test_common(data) + result = super()._test_common(data) if result is None: if self._samples is None: self._taus, self._samples, self._errors, self._ns = allantools.tdev(self._lpf_signal, rate=self._rate, data_type="phase", taus=self._taus_list) # noqa - if self.out_of_range(self._taus, self._samples): + if out_of_range(self._taus, self._samples, self._accuracy, self._limit): return (False, "unacceptable time deviation") return (True, None) return result def explain(self, data): - analysis = super().explain_common(data) + analysis = super()._explain_common(data) if analysis is None: if self._samples is None: self._taus, self._samples, self._errors, self._ns = allantools.tdev(self._lpf_signal, rate=self._rate, data_type="phase", taus=self._taus_list) # noqa @@ -433,24 +443,24 @@ def __init__(self, config): # limit of inaccuracy at observation point self._limit = config.parameter('maximum-time-interval-error-limit/%') # `taus` contains the list of observation windows intervals for which the metric will be actually computed - # `taus` is a subset of `taus_list` + # not that `taus` will be a subset of `taus_list` self._taus = None self._samples = None self._errors = None self._ns = None def test(self, data): - result = super().test_common(data) + result = super()._test_common(data) if result is None: if self._samples is None: self._taus, self._samples, self._errors, self._ns = allantools.mtie(self._lpf_signal, rate=self._rate, data_type="phase", taus=self._taus_list) # noqa - if self.out_of_range(self._taus, self._samples): + if out_of_range(self._taus, self._samples, self._accuracy, self._limit): return (False, "unacceptable mtie") return (True, None) return result def explain(self, data): - analysis = super().explain_common(data) + analysis = super()._explain_common(data) if analysis is None: if self._samples is None: self._taus, self._samples, self._errors, self._ns = allantools.mtie(self._lpf_signal, rate=self._rate, data_type="phase", taus=self._taus_list) # noqa