From 3310a9577ea3ae126f19693c25db8082ade102f2 Mon Sep 17 00:00:00 2001 From: David Venhoek Date: Thu, 5 Oct 2023 13:36:08 +0200 Subject: [PATCH] Allow handling of general messages through handle_timecritical_receive. (#275) --- statime/src/datastructures/messages/mod.rs | 15 ++ statime/src/port/mod.rs | 252 ++++++++++++++++++++- 2 files changed, 257 insertions(+), 10 deletions(-) diff --git a/statime/src/datastructures/messages/mod.rs b/statime/src/datastructures/messages/mod.rs index 20a2149c6..e37fe7323 100644 --- a/statime/src/datastructures/messages/mod.rs +++ b/statime/src/datastructures/messages/mod.rs @@ -111,6 +111,21 @@ pub(crate) struct Message<'a> { pub(crate) suffix: TlvSet<'a>, } +impl<'a> Message<'a> { + pub(crate) fn is_event(&self) -> bool { + use MessageBody::*; + match self.body { + Sync(_) | DelayReq(_) | PDelayReq(_) | PDelayResp(_) => true, + FollowUp(_) + | DelayResp(_) + | PDelayRespFollowUp(_) + | Announce(_) + | Signaling(_) + | Management(_) => false, + } + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) enum MessageBody { Sync(SyncMessage), diff --git a/statime/src/port/mod.rs b/statime/src/port/mod.rs index fc23fc8f8..b6f3daeb1 100644 --- a/statime/src/port/mod.rs +++ b/statime/src/port/mod.rs @@ -244,16 +244,18 @@ impl<'a, C: Clock, F: Filter, R: Rng> Port, R, C, F> { return actions![]; } - let actions = self.port_state.handle_event_receive( - message, - timestamp, - self.config.min_delay_req_interval(), - self.port_identity, - &mut self.clock, - &mut self.packet_buffer, - ); - - actions + if message.is_event() { + self.port_state.handle_event_receive( + message, + timestamp, + self.config.min_delay_req_interval(), + self.port_identity, + &mut self.clock, + &mut self.packet_buffer, + ) + } else { + self.handle_general_internal(message) + } } // Handle a general ptp message @@ -273,6 +275,10 @@ impl<'a, C: Clock, F: Filter, R: Rng> Port, R, C, F> { return actions![]; } + self.handle_general_internal(message) + } + + fn handle_general_internal(&mut self, message: Message<'_>) -> PortActionIterator<'_> { match message.body { MessageBody::Announce(announce) => { self.bmca @@ -554,3 +560,229 @@ impl<'a, C, F: Filter, R: Rng> Port, R, C, F> { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + datastructures::messages::{AnnounceMessage, Header, PtpVersion}, + BasicFilter, DelayMechanism, InstanceConfig, Interval, + }; + + struct TestClock; + + impl Clock for TestClock { + type Error = (); + + fn set_frequency(&mut self, _freq: f64) -> Result { + Ok(Time::default()) + } + + fn now(&self) -> Time { + panic!("Shouldn't be called"); + } + + fn set_properties( + &mut self, + _time_properties_ds: &crate::TimePropertiesDS, + ) -> Result<(), Self::Error> { + Ok(()) + } + + fn step_clock(&mut self, _offset: Duration) -> Result { + Ok(Time::default()) + } + } + + fn default_announce_message_header() -> Header { + Header { + sdo_id: Default::default(), + version: PtpVersion::new(2, 1).unwrap(), + domain_number: Default::default(), + alternate_master_flag: false, + two_step_flag: false, + unicast_flag: false, + ptp_profile_specific_1: false, + ptp_profile_specific_2: false, + leap61: false, + leap59: false, + current_utc_offset_valid: false, + ptp_timescale: false, + time_tracable: false, + frequency_tracable: false, + synchronization_uncertain: false, + correction_field: Default::default(), + source_port_identity: Default::default(), + sequence_id: Default::default(), + log_message_interval: Default::default(), + } + } + + fn default_announce_message() -> AnnounceMessage { + AnnounceMessage { + header: default_announce_message_header(), + origin_timestamp: Default::default(), + current_utc_offset: Default::default(), + grandmaster_priority_1: Default::default(), + grandmaster_clock_quality: Default::default(), + grandmaster_priority_2: Default::default(), + grandmaster_identity: Default::default(), + steps_removed: Default::default(), + time_source: Default::default(), + } + } + + #[test] + fn test_announce_receive() { + let default_ds = DefaultDS::new(InstanceConfig { + clock_identity: Default::default(), + priority_1: 255, + priority_2: 255, + domain_number: 0, + slave_only: false, + sdo_id: Default::default(), + }); + + let parent_ds = ParentDS::new(default_ds); + + let state = AtomicRefCell::new(PtpInstanceState { + default_ds, + current_ds: Default::default(), + parent_ds, + time_properties_ds: Default::default(), + }); + + let port = Port::<_, _, _, BasicFilter>::new( + &state, + PortConfig { + delay_mechanism: DelayMechanism::E2E { + interval: Interval::from_log_2(1), + }, + announce_interval: Interval::from_log_2(1), + announce_receipt_timeout: 3, + sync_interval: Interval::from_log_2(0), + master_only: false, + delay_asymmetry: Duration::ZERO, + }, + 0.25, + TestClock, + Default::default(), + rand::rngs::mock::StepRng::new(2, 1), + ); + + let (mut port, _) = port.end_bmca(); + + let mut announce = default_announce_message(); + announce.header.source_port_identity.clock_identity.0 = [1, 2, 3, 4, 5, 6, 7, 8]; + let announce_message = Message { + header: announce.header, + body: MessageBody::Announce(announce), + suffix: Default::default(), + }; + let mut packet = [0; MAX_DATA_LEN]; + let packet_len = announce_message.serialize(&mut packet).unwrap(); + let packet = &packet[..packet_len]; + + let mut actions = port.handle_general_receive(packet); + let Some(PortAction::ResetAnnounceReceiptTimer { .. }) = actions.next() else { + panic!("Unexpected action"); + }; + assert!(actions.next().is_none()); + drop(actions); + + let mut actions = port.handle_general_receive(packet); + let Some(PortAction::ResetAnnounceReceiptTimer { .. }) = actions.next() else { + panic!("Unexpected action"); + }; + assert!(actions.next().is_none()); + drop(actions); + + let mut actions = port.handle_general_receive(packet); + let Some(PortAction::ResetAnnounceReceiptTimer { .. }) = actions.next() else { + panic!("Unexpected action"); + }; + assert!(actions.next().is_none()); + drop(actions); + + let mut port = port.start_bmca(); + port.calculate_best_local_announce_message(); + assert!(port.best_local_announce_message().is_some()); + } + + #[test] + fn test_announce_receive_via_timecritical() { + let default_ds = DefaultDS::new(InstanceConfig { + clock_identity: Default::default(), + priority_1: 255, + priority_2: 255, + domain_number: 0, + slave_only: false, + sdo_id: Default::default(), + }); + + let parent_ds = ParentDS::new(default_ds); + + let state = AtomicRefCell::new(PtpInstanceState { + default_ds, + current_ds: Default::default(), + parent_ds, + time_properties_ds: Default::default(), + }); + + let port = Port::<_, _, _, BasicFilter>::new( + &state, + PortConfig { + delay_mechanism: DelayMechanism::E2E { + interval: Interval::from_log_2(1), + }, + announce_interval: Interval::from_log_2(1), + announce_receipt_timeout: 3, + sync_interval: Interval::from_log_2(0), + master_only: false, + delay_asymmetry: Duration::ZERO, + }, + 0.25, + TestClock, + Default::default(), + rand::rngs::mock::StepRng::new(2, 1), + ); + + let (mut port, _) = port.end_bmca(); + + let mut announce = default_announce_message(); + announce.header.source_port_identity.clock_identity.0 = [1, 2, 3, 4, 5, 6, 7, 8]; + let announce_message = Message { + header: announce.header, + body: MessageBody::Announce(announce), + suffix: Default::default(), + }; + let mut packet = [0; MAX_DATA_LEN]; + let packet_len = announce_message.serialize(&mut packet).unwrap(); + let packet = &packet[..packet_len]; + + let mut actions = port.handle_timecritical_receive(packet, Time::from_micros(1)); + let Some(PortAction::ResetAnnounceReceiptTimer { .. }) = actions.next() else { + panic!("Unexpected action"); + }; + assert!(actions.next().is_none()); + drop(actions); + + let mut actions = port.handle_timecritical_receive(packet, Time::from_micros(2)); + let Some(PortAction::ResetAnnounceReceiptTimer { .. }) = actions.next() else { + panic!("Unexpected action"); + }; + assert!(actions.next().is_none()); + drop(actions); + + let mut actions = port.handle_timecritical_receive(packet, Time::from_micros(3)); + let Some(PortAction::ResetAnnounceReceiptTimer { .. }) = actions.next() else { + panic!("Unexpected action"); + }; + assert!(actions.next().is_none()); + drop(actions); + + let mut port = port.start_bmca(); + port.calculate_best_local_announce_message(); + assert!(port.best_local_announce_message().is_some()); + } +}