From cda78f3dfd70336bbe434f553df055b3bd52a380 Mon Sep 17 00:00:00 2001 From: David Venhoek Date: Fri, 13 Dec 2024 15:32:11 +0100 Subject: [PATCH] Algorithm support for periodic sources. --- ntp-proto/src/algorithm/kalman/combiner.rs | 2 + ntp-proto/src/algorithm/kalman/mod.rs | 49 +- ntp-proto/src/algorithm/kalman/select.rs | 106 +++- ntp-proto/src/algorithm/kalman/source.rs | 626 ++++++++++++++++++--- ntp-proto/src/algorithm/mod.rs | 1 + ntp-proto/src/system.rs | 2 +- ntpd/src/force_sync/algorithm.rs | 10 +- 7 files changed, 688 insertions(+), 108 deletions(-) diff --git a/ntp-proto/src/algorithm/kalman/combiner.rs b/ntp-proto/src/algorithm/kalman/combiner.rs index 5b41f7568..5c6905fc9 100644 --- a/ntp-proto/src/algorithm/kalman/combiner.rs +++ b/ntp-proto/src/algorithm/kalman/combiner.rs @@ -101,6 +101,7 @@ mod tests { }, wander: 0.0, delay: 0.0, + period: None, source_uncertainty: NtpDuration::from_seconds(source_uncertainty), source_delay: NtpDuration::from_seconds(0.01), leap_indicator: NtpLeapIndicator::NoWarning, @@ -227,6 +228,7 @@ mod tests { }, wander: 0.0, delay: 0.0, + period: None, source_uncertainty: NtpDuration::from_seconds(0.0), source_delay: NtpDuration::from_seconds(0.0), leap_indicator: leap, diff --git a/ntp-proto/src/algorithm/kalman/mod.rs b/ntp-proto/src/algorithm/kalman/mod.rs index 690fa1ef4..d831c5e31 100644 --- a/ntp-proto/src/algorithm/kalman/mod.rs +++ b/ntp-proto/src/algorithm/kalman/mod.rs @@ -34,6 +34,7 @@ struct SourceSnapshot { state: KalmanState, wander: f64, delay: f64, + period: Option, source_uncertainty: NtpDuration, source_delay: NtpDuration, @@ -113,7 +114,10 @@ impl KalmanClockController KalmanClockController KalmanClockController TimeSyncC KalmanSourceController::new( id, self.algo_config, + None, self.source_defaults_config, AveragingBuffer::default(), ) @@ -391,11 +398,13 @@ impl TimeSyncC &mut self, id: SourceId, measurement_noise_estimate: f64, + period: Option, ) -> Self::OneWaySourceController { self.sources.insert(id, (None, false)); KalmanSourceController::new( id, self.algo_config, + period, self.source_defaults_config, measurement_noise_estimate, ) @@ -638,6 +647,29 @@ mod tests { }, wander: 0.0, delay: 0.0, + period: None, + source_uncertainty: NtpDuration::ZERO, + source_delay: NtpDuration::ZERO, + leap_indicator: NtpLeapIndicator::NoWarning, + last_update: NtpTimestamp::from_fixed_int(0), + }), + true, + ), + ); + + algo.sources.insert( + 1, + ( + Some(SourceSnapshot { + index: 0, + state: KalmanState { + state: Vector::new_vector([0.0, 0.0]), + uncertainty: Matrix::new([[1e-18, 0.0], [0.0, 1e-18]]), + time: NtpTimestamp::from_fixed_int(0), + }, + wander: 0.0, + delay: 0.0, + period: Some(3.0), source_uncertainty: NtpDuration::ZERO, source_delay: NtpDuration::ZERO, leap_indicator: NtpLeapIndicator::NoWarning, @@ -652,6 +684,10 @@ mod tests { algo.sources.get(&0).unwrap().0.unwrap().state.offset(), -100.0 ); + assert_eq!( + algo.sources.get(&1).unwrap().0.unwrap().state.offset(), + -1.0 + ); assert_eq!( algo.sources.get(&0).unwrap().0.unwrap().state.time, NtpTimestamp::from_seconds_nanos_since_ntp_era(100, 0) @@ -686,6 +722,7 @@ mod tests { }, wander: 0.0, delay: 0.0, + period: None, source_uncertainty: NtpDuration::ZERO, source_delay: NtpDuration::ZERO, leap_indicator: NtpLeapIndicator::NoWarning, diff --git a/ntp-proto/src/algorithm/kalman/select.rs b/ntp-proto/src/algorithm/kalman/select.rs index 4ab175ae9..4efb3bcf1 100644 --- a/ntp-proto/src/algorithm/kalman/select.rs +++ b/ntp-proto/src/algorithm/kalman/select.rs @@ -24,6 +24,11 @@ pub(super) fn select( let mut bounds: Vec<(f64, BoundType)> = Vec::with_capacity(2 * candidates.len()); for snapshot in candidates.iter() { + if snapshot.period.is_some() { + // Do not let periodic sources be part of the vote for correct time + continue; + } + let radius = snapshot.offset_uncertainty() * algo_config.range_statistical_weight + snapshot.delay * algo_config.range_delay_weight; if radius > algo_config.maximum_source_uncertainty @@ -38,21 +43,35 @@ pub(super) fn select( bounds.sort_by(|a, b| a.0.total_cmp(&b.0)); - let mut max: usize = 0; - let mut maxt: f64 = 0.0; + let mut maxlow: usize = 0; + let mut maxhigh: usize = 0; + let mut maxtlow: f64 = 0.0; + let mut maxthigh: f64 = 0.0; let mut cur: usize = 0; for (time, boundtype) in bounds.iter() { match boundtype { - BoundType::Start => cur += 1, - BoundType::End => cur -= 1, - } - if cur > max { - max = cur; - maxt = *time; + BoundType::Start => { + cur += 1; + if cur > maxlow { + maxlow = cur; + maxtlow = *time; + } + } + BoundType::End => { + if cur > maxhigh { + maxhigh = cur; + maxthigh = *time; + } + cur -= 1; + } } } + // Catch programming errors. If this ever fails there is high risk of missteering, better fail hard in that case + assert_eq!(maxlow, maxhigh); + let max = maxlow; + if max >= synchronization_config.minimum_agreeing_sources && max * 4 > bounds.len() { candidates .iter() @@ -60,8 +79,8 @@ pub(super) fn select( let radius = snapshot.offset_uncertainty() * algo_config.range_statistical_weight + snapshot.delay * algo_config.range_delay_weight; radius <= algo_config.maximum_source_uncertainty - && snapshot.offset() - radius <= maxt - && snapshot.offset() + radius >= maxt + && snapshot.offset() - radius <= maxthigh + && snapshot.offset() + radius >= maxtlow && snapshot.leap_indicator.is_synchronized() }) .cloned() @@ -86,7 +105,12 @@ mod tests { use super::*; - fn snapshot_for_range(center: f64, uncertainty: f64, delay: f64) -> SourceSnapshot { + fn snapshot_for_range( + center: f64, + uncertainty: f64, + delay: f64, + period: Option, + ) -> SourceSnapshot { SourceSnapshot { index: 0, state: KalmanState { @@ -96,6 +120,7 @@ mod tests { }, wander: 0.0, delay, + period, source_uncertainty: NtpDuration::from_seconds(0.01), source_delay: NtpDuration::from_seconds(0.01), leap_indicator: NtpLeapIndicator::NoWarning, @@ -108,10 +133,10 @@ mod tests { // Test that there only is sufficient overlap in the below set when // both statistical and delay based errors are considered. let candidates = vec![ - snapshot_for_range(0.0, 0.01, 0.09), - snapshot_for_range(0.0, 0.09, 0.01), - snapshot_for_range(0.05, 0.01, 0.09), - snapshot_for_range(0.05, 0.09, 0.01), + snapshot_for_range(0.0, 0.01, 0.09, None), + snapshot_for_range(0.0, 0.09, 0.01, None), + snapshot_for_range(0.05, 0.01, 0.09, None), + snapshot_for_range(0.05, 0.09, 0.01, None), ]; let sysconfig = SynchronizationConfig { minimum_agreeing_sources: 4, @@ -151,9 +176,9 @@ mod tests { fn test_rejection() { // Test sources get properly rejected as rejection bound gets tightened. let candidates = vec![ - snapshot_for_range(0.0, 1.0, 1.0), - snapshot_for_range(0.0, 0.1, 0.1), - snapshot_for_range(0.0, 0.01, 0.01), + snapshot_for_range(0.0, 1.0, 1.0, None), + snapshot_for_range(0.0, 0.1, 0.1, None), + snapshot_for_range(0.0, 0.01, 0.01, None), ]; let sysconfig = SynchronizationConfig { minimum_agreeing_sources: 1, @@ -201,11 +226,11 @@ mod tests { fn test_min_survivors() { // Test that minimum number of survivors is correctly tested for. let candidates = vec![ - snapshot_for_range(0.0, 0.1, 0.1), - snapshot_for_range(0.0, 0.1, 0.1), - snapshot_for_range(0.0, 0.1, 0.1), - snapshot_for_range(0.5, 0.1, 0.1), - snapshot_for_range(0.5, 0.1, 0.1), + snapshot_for_range(0.0, 0.1, 0.1, None), + snapshot_for_range(0.0, 0.1, 0.1, None), + snapshot_for_range(0.0, 0.1, 0.1, None), + snapshot_for_range(0.5, 0.1, 0.1, None), + snapshot_for_range(0.5, 0.1, 0.1, None), ]; let algconfig = AlgorithmConfig { maximum_source_uncertainty: 3.0, @@ -233,10 +258,10 @@ mod tests { fn test_tie() { // Test that in the case of a tie no group is chosen. let candidates = vec![ - snapshot_for_range(0.0, 0.1, 0.1), - snapshot_for_range(0.0, 0.1, 0.1), - snapshot_for_range(0.5, 0.1, 0.1), - snapshot_for_range(0.5, 0.1, 0.1), + snapshot_for_range(0.0, 0.1, 0.1, None), + snapshot_for_range(0.0, 0.1, 0.1, None), + snapshot_for_range(0.5, 0.1, 0.1, None), + snapshot_for_range(0.5, 0.1, 0.1, None), ]; let algconfig = AlgorithmConfig { maximum_source_uncertainty: 3.0, @@ -251,4 +276,31 @@ mod tests { let result = select(&sysconfig, &algconfig, candidates); assert_eq!(result.len(), 0); } + + #[test] + fn test_periodic_is_ignored() { + let candidates = vec![ + snapshot_for_range(0.0, 0.01, 0.01, None), + snapshot_for_range(0.0, 0.01, 0.01, Some(1.0)), + snapshot_for_range(0.0, 0.01, 0.01, Some(1.0)), + snapshot_for_range(0.0, 0.01, 0.01, Some(1.0)), + snapshot_for_range(0.5, 0.01, 0.01, None), + snapshot_for_range(0.5, 0.01, 0.01, None), + snapshot_for_range(0.5, 0.01, 0.01, Some(1.0)), + ]; + let algconfig = AlgorithmConfig::default(); + let sysconfig = SynchronizationConfig { + minimum_agreeing_sources: 2, + ..Default::default() + }; + let result = select(&sysconfig, &algconfig, candidates.clone()); + assert_eq!(result.len(), 3); + assert_eq!(result[0].offset(), 0.5); + let sysconfig = SynchronizationConfig { + minimum_agreeing_sources: 3, + ..Default::default() + }; + let result = select(&sysconfig, &algconfig, candidates); + assert_eq!(result.len(), 0); + } } diff --git a/ntp-proto/src/algorithm/kalman/source.rs b/ntp-proto/src/algorithm/kalman/source.rs index f93c28358..3e1e8db69 100644 --- a/ntp-proto/src/algorithm/kalman/source.rs +++ b/ntp-proto/src/algorithm/kalman/source.rs @@ -108,7 +108,26 @@ pub(super) struct MeasurementStats { impl KalmanState { #[must_use] - pub fn progress_time(&self, time: NtpTimestamp, wander: f64) -> KalmanState { + fn correct_periodicity(mut self, period: Option) -> KalmanState { + if let Some(period) = period { + while self.state.ventry(0) > period / 2.0 { + self.state = self.state - Vector::new_vector([period, 0.0]); + } + while self.state.ventry(0) < -period / 2.0 { + self.state = self.state + Vector::new_vector([period, 0.0]); + } + } + + self + } + + #[must_use] + pub fn progress_time( + &self, + time: NtpTimestamp, + wander: f64, + period: Option, + ) -> KalmanState { debug_assert!(!time.is_before(self.time)); if time.is_before(self.time) { return *self; @@ -131,6 +150,7 @@ impl KalmanState { uncertainty: update * self.uncertainty * update.transpose() + process_noise, time, } + .correct_periodicity(period) } #[must_use] @@ -139,8 +159,12 @@ impl KalmanState { measurement: Matrix<1, 2>, value: Vector<1>, noise: Matrix<1, 1>, + period: Option, + measurement_period_correction: impl Fn(Vector<1>, Vector<1>, Option) -> Vector<1>, ) -> (KalmanState, MeasurementStats) { - let difference = value - measurement * self.state; + let prediction = measurement * self.state; + let corrected_value = measurement_period_correction(value, prediction, period); + let difference = corrected_value - prediction; let difference_covariance = measurement * self.uncertainty * measurement.transpose() + noise; let update_strength = @@ -160,7 +184,8 @@ impl KalmanState { uncertainty: ((Matrix::unit() - update_strength * measurement) * self.uncertainty) .symmetrize(), time: self.time, - }, + } + .correct_periodicity(period), MeasurementStats { observe_probability, weight, @@ -211,12 +236,13 @@ impl KalmanState { } #[must_use] - pub fn process_offset_steering(&self, steer: f64) -> KalmanState { + pub fn process_offset_steering(&self, steer: f64, period: Option) -> KalmanState { KalmanState { state: self.state - Vector::new_vector([steer, 0.0]), uncertainty: self.uncertainty, time: self.time + NtpDuration::from_seconds(steer), } + .correct_periodicity(period) } #[must_use] @@ -225,8 +251,9 @@ impl KalmanState { time: NtpTimestamp, steer: f64, wander: f64, + period: Option, ) -> KalmanState { - let mut result = self.progress_time(time, wander); + let mut result = self.progress_time(time, wander, period); result.state = result.state - Vector::new_vector([0.0, steer]); result } @@ -373,18 +400,60 @@ struct InitialSourceFilter< impl + Clone> InitialSourceFilter { - fn update(&mut self, measurement: Measurement) { + fn correct_period(&mut self, period: Option) { + if self.samples == 0 { + return; + } + if let Some(period) = period { + while self.cur_avg() > period / 2.0 { + for sample in self.init_offset.data.iter_mut() { + *sample -= period; + } + } + + while self.cur_avg() < -period / 2.0 { + for sample in self.init_offset.data.iter_mut() { + *sample += period; + } + } + } + } + + fn cur_avg(&self) -> f64 { + if self.samples == 0 { + 0.0 + } else { + self.init_offset.data[0..self.samples as usize] + .iter() + .sum::() + / (self.samples as f64) + } + } + + fn update(&mut self, measurement: Measurement, period: Option) { + let mut offset = measurement.offset.to_seconds(); + if let Some(period) = period { + while offset - self.cur_avg() > period / 2.0 { + offset -= period; + } + while offset - self.cur_avg() < -period / 2.0 { + offset += period; + } + } + self.noise_estimator.update(measurement.delay); - self.init_offset.update(measurement.offset.to_seconds()); + self.init_offset.update(offset); self.samples += 1; self.last_measurement = Some(measurement); + self.correct_period(period); debug!(samples = self.samples, "Initial source update"); } - fn process_offset_steering(&mut self, steer: f64) { + fn process_offset_steering(&mut self, steer: f64, period: Option) { for sample in self.init_offset.data.iter_mut() { *sample -= steer; } + self.correct_period(period); } } @@ -410,14 +479,18 @@ impl SourceFilter { /// Move the filter forward to reflect the situation at a new, later timestamp - fn progress_filtertime(&mut self, time: NtpTimestamp) { - self.state = self.state.progress_time(time, self.clock_wander); + fn progress_filtertime(&mut self, time: NtpTimestamp, period: Option) { + self.state = self.state.progress_time(time, self.clock_wander, period); trace!(?time, "Filter progressed"); } /// Absorb knowledge from a measurement - fn absorb_measurement(&mut self, measurement: Measurement) -> (f64, f64, f64) { + fn absorb_measurement( + &mut self, + measurement: Measurement, + period: Option, + ) -> (f64, f64, f64) { // Measurement parameters let m_delta_t = (measurement.localtime - self.last_measurement.localtime).to_seconds(); @@ -429,6 +502,19 @@ impl measurement_transform, measurement_vec, measurement_noise, + period, + |mut value, prediction, period| { + if let Some(period) = period { + while (value - prediction).ventry(0) > period / 2.0 { + value = value - Vector::new_vector([period]) + } + while (value - prediction).ventry(0) < -period / 2.0 { + value = value + Vector::new_vector([period]) + } + } + + value + }, ); self.state = new_state; @@ -528,6 +614,7 @@ impl source_defaults_config: &SourceDefaultsConfig, algo_config: &AlgorithmConfig, measurement: Measurement, + period: Option, ) -> bool { // Always update the root_delay, root_dispersion, leap second status and stratum, as they always represent the most accurate state. self.last_measurement.root_delay = measurement.root_delay; @@ -555,10 +642,10 @@ impl } // Environment update - self.progress_filtertime(measurement.localtime); + self.progress_filtertime(measurement.localtime, period); self.noise_estimator.update(measurement.delay); - let (p, weight, measurement_period) = self.absorb_measurement(measurement); + let (p, weight, measurement_period) = self.absorb_measurement(measurement, period); self.update_wander_estimate(algo_config, p, weight); self.update_desired_poll( @@ -583,16 +670,16 @@ impl true } - fn process_offset_steering(&mut self, steer: f64) { - self.state = self.state.process_offset_steering(steer); + fn process_offset_steering(&mut self, steer: f64, period: Option) { + self.state = self.state.process_offset_steering(steer, period); self.last_measurement.offset -= NtpDuration::from_seconds(steer); self.last_measurement.localtime += NtpDuration::from_seconds(steer); } - fn process_frequency_steering(&mut self, time: NtpTimestamp, steer: f64) { + fn process_frequency_steering(&mut self, time: NtpTimestamp, steer: f64, period: Option) { self.state = self .state - .process_frequency_steering(time, steer, self.clock_wander); + .process_frequency_steering(time, steer, self.clock_wander, period); self.last_measurement.offset += NtpDuration::from_seconds( steer * (time - self.last_measurement.localtime).to_seconds(), ); @@ -635,6 +722,7 @@ impl source_defaults_config: &SourceDefaultsConfig, algo_config: &AlgorithmConfig, mut measurement: Measurement, + period: Option, ) -> bool { // preprocessing let noise_estimator = match self { @@ -643,7 +731,12 @@ impl }; measurement.delay = noise_estimator.preprocess(measurement.delay); - self.update_self_using_raw_measurement(source_defaults_config, algo_config, measurement) + self.update_self_using_raw_measurement( + source_defaults_config, + algo_config, + measurement, + period, + ) } fn update_self_using_raw_measurement( @@ -651,10 +744,11 @@ impl source_defaults_config: &SourceDefaultsConfig, algo_config: &AlgorithmConfig, measurement: Measurement, + period: Option, ) -> bool { match &mut self.0 { SourceStateInner::Initial(filter) => { - filter.update(measurement); + filter.update(measurement, period); if filter.samples == 8 { *self = SourceState(SourceStateInner::Stable(SourceFilter { state: KalmanState { @@ -664,7 +758,8 @@ impl [0., sqr(algo_config.initial_frequency_uncertainty)], ]), time: measurement.localtime, - }, + } + .correct_periodicity(period), clock_wander: sqr(algo_config.initial_wander), noise_estimator: filter.noise_estimator.clone(), precision_score: 0, @@ -704,7 +799,7 @@ impl false } else { - filter.update(source_defaults_config, algo_config, measurement) + filter.update(source_defaults_config, algo_config, measurement, period) } } } @@ -714,6 +809,7 @@ impl &self, index: Index, config: &AlgorithmConfig, + period: Option, ) -> Option> { match &self.0 { SourceStateInner::Initial(InitialSourceFilter { @@ -730,6 +826,7 @@ impl leap_indicator: last_measurement.leap, last_update: last_measurement.localtime, delay: max_roundtrip, + period, state: KalmanState { state: Vector::new_vector([ init_offset.data[..*samples as usize] @@ -753,6 +850,7 @@ impl state: filter.state, wander: filter.clock_wander, delay: filter.noise_estimator.get_delay_mean(), + period, source_uncertainty: filter.last_measurement.root_dispersion, source_delay: filter.last_measurement.root_delay, leap_indicator: filter.last_measurement.leap, @@ -769,17 +867,28 @@ impl } } - pub fn process_offset_steering(&mut self, steer: f64) { + pub fn process_offset_steering(&mut self, mut steer: f64, period: Option) { + if let Some(period) = period { + // Ensure the fine correction code doesn't make too many iterations + steer %= period; + } match &mut self.0 { - SourceStateInner::Initial(filter) => filter.process_offset_steering(steer), - SourceStateInner::Stable(filter) => filter.process_offset_steering(steer), + SourceStateInner::Initial(filter) => filter.process_offset_steering(steer, period), + SourceStateInner::Stable(filter) => filter.process_offset_steering(steer, period), } } - pub fn process_frequency_steering(&mut self, time: NtpTimestamp, steer: f64) { + pub fn process_frequency_steering( + &mut self, + time: NtpTimestamp, + steer: f64, + period: Option, + ) { match &mut self.0 { SourceStateInner::Initial(_) => {} - SourceStateInner::Stable(filter) => filter.process_frequency_steering(time, steer), + SourceStateInner::Stable(filter) => { + filter.process_frequency_steering(time, steer, period) + } } } } @@ -792,6 +901,7 @@ pub struct KalmanSourceController< > { index: SourceId, state: SourceState, + period: Option, algo_config: AlgorithmConfig, source_defaults_config: SourceDefaultsConfig, } @@ -810,12 +920,14 @@ impl< pub(super) fn new( index: SourceId, algo_config: AlgorithmConfig, + period: Option, source_defaults_config: SourceDefaultsConfig, noise_estimator: N, ) -> Self { KalmanSourceController { index, state: SourceState::new(noise_estimator), + period, algo_config, source_defaults_config, } @@ -835,11 +947,11 @@ impl< fn handle_message(&mut self, message: Self::ControllerMessage) { match message.inner { super::KalmanControllerMessageInner::Step { steer } => { - self.state.process_offset_steering(steer); - } - super::KalmanControllerMessageInner::FreqChange { steer, time } => { - self.state.process_frequency_steering(time, steer) + self.state.process_offset_steering(steer, self.period); } + super::KalmanControllerMessageInner::FreqChange { steer, time } => self + .state + .process_frequency_steering(time, steer, self.period), } } @@ -851,9 +963,10 @@ impl< &self.source_defaults_config, &self.algo_config, measurement, + self.period, ) { self.state - .snapshot(self.index, &self.algo_config) + .snapshot(self.index, &self.algo_config, self.period) .map(|snapshot| KalmanSourceMessage { inner: snapshot }) } else { None @@ -867,7 +980,7 @@ impl< fn observe(&self) -> super::super::ObservableSourceTimedata { self.state - .snapshot(&self.index, &self.algo_config) + .snapshot(&self.index, &self.algo_config, self.period) .map(|snapshot| snapshot.observe()) .unwrap_or(ObservableSourceTimedata { offset: NtpDuration::ZERO, @@ -935,6 +1048,7 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!(matches!(source, SourceState(SourceStateInner::Initial(_)))); @@ -967,7 +1081,7 @@ mod tests { prev_was_outlier: false, last_iter: base, })); - source.process_offset_steering(-1800.0); + source.process_offset_steering(-1800.0, None); source.update_self_using_measurement( &SourceDefaultsConfig::default(), &AlgorithmConfig::default(), @@ -983,6 +1097,7 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!(matches!(source, SourceState(SourceStateInner::Stable(_)))); @@ -1015,7 +1130,7 @@ mod tests { prev_was_outlier: false, last_iter: base, })); - source.process_offset_steering(1800.0); + source.process_offset_steering(1800.0, None); source.update_self_using_measurement( &SourceDefaultsConfig::default(), &AlgorithmConfig::default(), @@ -1031,6 +1146,7 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!(matches!(source, SourceState(SourceStateInner::Stable(_)))); } @@ -1071,10 +1187,10 @@ mod tests { last_iter: base, })); - source.process_offset_steering(20e-3); + source.process_offset_steering(20e-3, None); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .offset() @@ -1109,10 +1225,10 @@ mod tests { last_iter: base, })); - source.process_offset_steering(20e-3); + source.process_offset_steering(20e-3, None); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .offset() @@ -1135,11 +1251,12 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( dbg!((source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .offset() @@ -1149,7 +1266,7 @@ mod tests { ); assert!( (source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency() @@ -1185,10 +1302,10 @@ mod tests { last_iter: base, })); - source.process_offset_steering(-20e-3); + source.process_offset_steering(-20e-3, None); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .offset() @@ -1211,11 +1328,12 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( dbg!((source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .offset() @@ -1225,7 +1343,7 @@ mod tests { ); assert!( (source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency() @@ -1251,6 +1369,352 @@ mod tests { test_offset_steering_and_measurements(1e-9, ()); } + #[test] + fn test_offset_steering_periodic() { + let base = NtpTimestamp::from_fixed_int(0); + let basei = NtpInstant::now(); + let mut source = SourceState(SourceStateInner::Stable(SourceFilter { + state: KalmanState { + state: Vector::new_vector([0.4, 0.]), + uncertainty: Matrix::new([[1e-6, 0.], [0., 1e-8]]), + time: base, + }, + clock_wander: 1e-8, + noise_estimator: AveragingBuffer { + data: [0.0, 0.0, 0.0, 0.0, 0.875e-6, 0.875e-6, 0.875e-6, 0.875e-6], + next_idx: 0, + }, + precision_score: 0, + poll_score: 0, + desired_poll_interval: PollIntervalLimits::default().min, + last_measurement: Measurement { + delay: NtpDuration::from_seconds(0.0), + offset: NtpDuration::from_seconds(0.4), + localtime: base, + monotime: basei, + + stratum: 0, + root_delay: NtpDuration::default(), + root_dispersion: NtpDuration::default(), + leap: NtpLeapIndicator::NoWarning, + precision: 0, + }, + prev_was_outlier: false, + last_iter: base, + })); + + source.process_offset_steering(-0.2, Some(1.0)); + assert!( + source + .snapshot(0_usize, &AlgorithmConfig::default(), Some(1.0)) + .unwrap() + .offset() + + 0.4 + < 0.001 + ); + + source.process_offset_steering(100.5, Some(1.0)); + assert!( + source + .snapshot(0_usize, &AlgorithmConfig::default(), Some(1.0)) + .unwrap() + .offset() + - 0.1 + < 0.001 + ); + } + + #[test] + fn test_periodic_measurement() { + let base = NtpTimestamp::from_fixed_int(0); + let basei = NtpInstant::now(); + let mut source = SourceState(SourceStateInner::Stable(SourceFilter { + state: KalmanState { + state: Vector::new_vector([0.4, 0.]), + uncertainty: Matrix::new([ + [ + AveragingBuffer { + data: [0.0, 0.0, 0.0, 0.0, 1e-6, 1e-6, 1e-6, 1e-6], + next_idx: 0, + } + .get_noise_estimate(), + 0., + ], + [0., 1e-8], + ]), + time: base, + }, + clock_wander: 1e-8, + noise_estimator: AveragingBuffer { + data: [0.0, 0.0, 0.0, 0.0, 1e-6, 1e-6, 1e-6, 1e-6], + next_idx: 0, + }, + precision_score: 0, + poll_score: 0, + desired_poll_interval: PollIntervalLimits::default().min, + last_measurement: Measurement { + delay: NtpDuration::from_seconds(0.0), + offset: NtpDuration::from_seconds(0.4), + localtime: base, + monotime: basei, + + stratum: 0, + root_delay: NtpDuration::default(), + root_dispersion: NtpDuration::default(), + leap: NtpLeapIndicator::NoWarning, + precision: 0, + }, + prev_was_outlier: false, + last_iter: base, + })); + + source.update_self_using_raw_measurement( + &SourceDefaultsConfig::default(), + &AlgorithmConfig::default(), + Measurement { + delay: NtpDuration::ZERO, + offset: NtpDuration::from_seconds(-0.3), + localtime: base, + monotime: basei, + + stratum: 0, + root_delay: NtpDuration::default(), + root_dispersion: NtpDuration::default(), + leap: NtpLeapIndicator::NoWarning, + precision: 0, + }, + Some(1.0), + ); + assert!( + source + .snapshot(0_usize, &AlgorithmConfig::default(), Some(1.0)) + .unwrap() + .offset() + + 0.45 + < 0.001 + ); + } + + #[test] + fn test_periodic_measurement_init() { + let base = NtpTimestamp::from_fixed_int(0); + let basei = NtpInstant::now(); + let mut source = SourceState::new(AveragingBuffer { + data: [0.0, 0.0, 0.0, 0.0, 0.875e-6, 0.875e-6, 0.875e-6, 0.875e-6], + next_idx: 0, + }); + assert!(source + .snapshot(0_usize, &AlgorithmConfig::default(), None) + .is_none()); + source.update_self_using_measurement( + &SourceDefaultsConfig::default(), + &AlgorithmConfig::default(), + Measurement { + delay: NtpDuration::ZERO, + offset: NtpDuration::from_seconds(0.48), + localtime: base + NtpDuration::from_seconds(1.0), + monotime: basei + std::time::Duration::from_secs(1), + + stratum: 0, + root_delay: NtpDuration::default(), + root_dispersion: NtpDuration::default(), + leap: NtpLeapIndicator::NoWarning, + precision: 0, + }, + Some(1.0), + ); + assert!( + source + .snapshot(0_usize, &AlgorithmConfig::default(), Some(1.0)) + .unwrap() + .offset() + .abs() + < 0.5001 + ); + source.update_self_using_measurement( + &SourceDefaultsConfig::default(), + &AlgorithmConfig::default(), + Measurement { + delay: NtpDuration::ZERO, + offset: NtpDuration::from_seconds(0.49), + localtime: base + NtpDuration::from_seconds(2.0), + monotime: basei + std::time::Duration::from_secs(2), + + stratum: 0, + root_delay: NtpDuration::default(), + root_dispersion: NtpDuration::default(), + leap: NtpLeapIndicator::NoWarning, + precision: 0, + }, + Some(1.0), + ); + assert!( + source + .snapshot(0_usize, &AlgorithmConfig::default(), Some(1.0)) + .unwrap() + .offset() + .abs() + < 0.5001 + ); + source.update_self_using_measurement( + &SourceDefaultsConfig::default(), + &AlgorithmConfig::default(), + Measurement { + delay: NtpDuration::ZERO, + offset: NtpDuration::from_seconds(0.50), + localtime: base + NtpDuration::from_seconds(3.0), + monotime: basei + std::time::Duration::from_secs(3), + + stratum: 0, + root_delay: NtpDuration::default(), + root_dispersion: NtpDuration::default(), + leap: NtpLeapIndicator::NoWarning, + precision: 0, + }, + Some(1.0), + ); + assert!( + source + .snapshot(0_usize, &AlgorithmConfig::default(), Some(1.0)) + .unwrap() + .offset() + .abs() + < 0.5001 + ); + source.update_self_using_measurement( + &SourceDefaultsConfig::default(), + &AlgorithmConfig::default(), + Measurement { + delay: NtpDuration::ZERO, + offset: NtpDuration::from_seconds(-0.49), + localtime: base + NtpDuration::from_seconds(4.0), + monotime: basei + std::time::Duration::from_secs(4), + + stratum: 0, + root_delay: NtpDuration::default(), + root_dispersion: NtpDuration::default(), + leap: NtpLeapIndicator::NoWarning, + precision: 0, + }, + Some(1.0), + ); + assert!( + source + .snapshot(0_usize, &AlgorithmConfig::default(), Some(1.0)) + .unwrap() + .offset() + .abs() + < 0.5001 + ); + source.update_self_using_measurement( + &SourceDefaultsConfig::default(), + &AlgorithmConfig::default(), + Measurement { + delay: NtpDuration::ZERO, + offset: NtpDuration::from_seconds(-0.48), + localtime: base + NtpDuration::from_seconds(5.0), + monotime: basei + std::time::Duration::from_secs(5), + + stratum: 0, + root_delay: NtpDuration::default(), + root_dispersion: NtpDuration::default(), + leap: NtpLeapIndicator::NoWarning, + precision: 0, + }, + Some(1.0), + ); + assert!( + source + .snapshot(0_usize, &AlgorithmConfig::default(), Some(1.0)) + .unwrap() + .offset() + .abs() + < 0.5001 + ); + source.update_self_using_measurement( + &SourceDefaultsConfig::default(), + &AlgorithmConfig::default(), + Measurement { + delay: NtpDuration::ZERO, + offset: NtpDuration::from_seconds(-0.47), + localtime: base + NtpDuration::from_seconds(6.0), + monotime: basei + std::time::Duration::from_secs(6), + + stratum: 0, + root_delay: NtpDuration::default(), + root_dispersion: NtpDuration::default(), + leap: NtpLeapIndicator::NoWarning, + precision: 0, + }, + Some(1.0), + ); + assert!( + source + .snapshot(0_usize, &AlgorithmConfig::default(), Some(1.0)) + .unwrap() + .offset() + .abs() + < 0.5001 + ); + source.update_self_using_measurement( + &SourceDefaultsConfig::default(), + &AlgorithmConfig::default(), + Measurement { + delay: NtpDuration::ZERO, + offset: NtpDuration::from_seconds(-0.46), + localtime: base + NtpDuration::from_seconds(7.0), + monotime: basei + std::time::Duration::from_secs(7), + + stratum: 0, + root_delay: NtpDuration::default(), + root_dispersion: NtpDuration::default(), + leap: NtpLeapIndicator::NoWarning, + precision: 0, + }, + Some(1.0), + ); + assert!( + source + .snapshot(0_usize, &AlgorithmConfig::default(), Some(1.0)) + .unwrap() + .offset() + .abs() + < 0.5001 + ); + source.update_self_using_measurement( + &SourceDefaultsConfig::default(), + &AlgorithmConfig::default(), + Measurement { + delay: NtpDuration::ZERO, + offset: NtpDuration::from_seconds(-0.45), + localtime: base + NtpDuration::from_seconds(8.0), + monotime: basei + std::time::Duration::from_secs(8), + + stratum: 0, + root_delay: NtpDuration::default(), + root_dispersion: NtpDuration::default(), + leap: NtpLeapIndicator::NoWarning, + precision: 0, + }, + None, + ); + assert!( + source + .snapshot(0_usize, &AlgorithmConfig::default(), Some(1.0)) + .unwrap() + .offset() + .abs() + < 0.5001 + ); + assert!( + source + .snapshot(0_usize, &AlgorithmConfig::default(), Some(1.0)) + .unwrap() + .offset_uncertainty() + < 0.1 + ); + } + #[test] fn test_freq_steering() { let noise_estimator = AveragingBuffer { @@ -1288,11 +1752,11 @@ mod tests { last_iter: base, }; - source.process_frequency_steering(base + NtpDuration::from_seconds(5.0), 200e-6); + source.process_frequency_steering(base + NtpDuration::from_seconds(5.0), 200e-6, None); assert!((source.state.frequency() - -200e-6).abs() < 1e-10); assert!(source.state.offset().abs() < 1e-8); assert!((source.last_measurement.offset.to_seconds() - 1e-3).abs() < 1e-8); - source.process_frequency_steering(base + NtpDuration::from_seconds(10.0), -200e-6); + source.process_frequency_steering(base + NtpDuration::from_seconds(10.0), -200e-6, None); assert!(source.state.frequency().abs() < 1e-10); assert!((source.state.offset() - -1e-3).abs() < 1e-8); assert!((source.last_measurement.offset.to_seconds() - -1e-3).abs() < 1e-8); @@ -1324,10 +1788,10 @@ mod tests { last_iter: base, })); - source.process_frequency_steering(base + NtpDuration::from_seconds(5.0), 200e-6); + source.process_frequency_steering(base + NtpDuration::from_seconds(5.0), 200e-6, None); assert!( (source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency() @@ -1337,17 +1801,17 @@ mod tests { ); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .offset() .abs() < 1e-8 ); - source.process_frequency_steering(base + NtpDuration::from_seconds(10.0), -200e-6); + source.process_frequency_steering(base + NtpDuration::from_seconds(10.0), -200e-6, None); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency() @@ -1356,7 +1820,7 @@ mod tests { ); assert!( (source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .offset() @@ -1377,7 +1841,7 @@ mod tests { let basei = NtpInstant::now(); let mut source = SourceState::new(noise_estimator); assert!(source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .is_none()); source.update_self_using_measurement( &SourceDefaultsConfig::default(), @@ -1394,10 +1858,11 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency_variance() @@ -1418,10 +1883,11 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency_variance() @@ -1442,10 +1908,11 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency_variance() @@ -1466,10 +1933,11 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency_variance() @@ -1490,10 +1958,11 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency_variance() @@ -1514,10 +1983,11 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency_variance() @@ -1538,10 +2008,11 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency_variance() @@ -1562,10 +2033,11 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( (source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .offset() @@ -1575,7 +2047,7 @@ mod tests { ); assert!( (source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .offset_variance() @@ -1606,7 +2078,7 @@ mod tests { let basei = NtpInstant::now(); let mut source = SourceState::new(AveragingBuffer::default()); assert!(source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .is_none()); source.update_self_using_measurement( &SourceDefaultsConfig::default(), @@ -1623,10 +2095,11 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency_variance() @@ -1647,10 +2120,11 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency_variance() @@ -1671,10 +2145,11 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency_variance() @@ -1695,11 +2170,12 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); - source.process_offset_steering(4e-3); + source.process_offset_steering(4e-3, None); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency_variance() @@ -1720,10 +2196,11 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency_variance() @@ -1744,10 +2221,11 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency_variance() @@ -1768,10 +2246,11 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .frequency_variance() @@ -1792,10 +2271,11 @@ mod tests { leap: NtpLeapIndicator::NoWarning, precision: 0, }, + None, ); assert!( (source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .offset() @@ -1805,7 +2285,7 @@ mod tests { ); assert!( (source - .snapshot(0_usize, &AlgorithmConfig::default()) + .snapshot(0_usize, &AlgorithmConfig::default(), None) .unwrap() .state .offset_variance() diff --git a/ntp-proto/src/algorithm/mod.rs b/ntp-proto/src/algorithm/mod.rs index 0af89e098..87578363b 100644 --- a/ntp-proto/src/algorithm/mod.rs +++ b/ntp-proto/src/algorithm/mod.rs @@ -84,6 +84,7 @@ pub trait TimeSyncController: Sized + Send + 'static { &mut self, id: Self::SourceId, measurement_noise_estimate: f64, + period: Option, ) -> Self::OneWaySourceController; /// Notify the controller that a previous source has gone fn remove_source(&mut self, id: Self::SourceId); diff --git a/ntp-proto/src/system.rs b/ntp-proto/src/system.rs index 32b64f673..48a1a1602 100644 --- a/ntp-proto/src/system.rs +++ b/ntp-proto/src/system.rs @@ -259,7 +259,7 @@ impl { delay_type: PhantomData, min_poll_interval: PollInterval, done: bool, + ignore: bool, } #[derive(Debug, Copy, Clone)] @@ -155,6 +156,7 @@ impl TimeSyncController for SingleShotController { delay_type: PhantomData, min_poll_interval: self.min_poll_interval, done: false, + ignore: false, } } @@ -162,11 +164,13 @@ impl TimeSyncController for SingleShotController { &mut self, _id: Self::SourceId, _measurement_noise_estimate: f64, + period: Option, ) -> Self::OneWaySourceController { SingleShotSourceController::<()> { delay_type: PhantomData, min_poll_interval: self.min_poll_interval, done: false, + ignore: period.is_some(), } } @@ -214,7 +218,11 @@ where measurement: Measurement, ) -> Option { self.done = true; - Some(measurement.wrap()) + if self.ignore { + None + } else { + Some(measurement.wrap()) + } } fn desired_poll_interval(&self) -> ntp_proto::PollInterval {