Skip to content

Commit

Permalink
Allow handling of general messages through handle_timecritical_receiv…
Browse files Browse the repository at this point in the history
…e. (#275)
  • Loading branch information
davidv1992 authored Oct 5, 2023
1 parent b49e0b9 commit 3310a95
Show file tree
Hide file tree
Showing 2 changed files with 257 additions and 10 deletions.
15 changes: 15 additions & 0 deletions statime/src/datastructures/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,21 @@ pub(crate) struct Message<'a> {
pub(crate) suffix: TlvSet<'a>,
}

impl<'a> Message<'a> {
pub(crate) fn is_event(&self) -> bool {
use MessageBody::*;
match self.body {
Sync(_) | DelayReq(_) | PDelayReq(_) | PDelayResp(_) => true,
FollowUp(_)
| DelayResp(_)
| PDelayRespFollowUp(_)
| Announce(_)
| Signaling(_)
| Management(_) => false,
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum MessageBody {
Sync(SyncMessage),
Expand Down
252 changes: 242 additions & 10 deletions statime/src/port/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,18 @@ impl<'a, C: Clock, F: Filter, R: Rng> Port<Running<'a>, R, C, F> {
return actions![];
}

let actions = self.port_state.handle_event_receive(
message,
timestamp,
self.config.min_delay_req_interval(),
self.port_identity,
&mut self.clock,
&mut self.packet_buffer,
);

actions
if message.is_event() {
self.port_state.handle_event_receive(
message,
timestamp,
self.config.min_delay_req_interval(),
self.port_identity,
&mut self.clock,
&mut self.packet_buffer,
)
} else {
self.handle_general_internal(message)
}
}

// Handle a general ptp message
Expand All @@ -273,6 +275,10 @@ impl<'a, C: Clock, F: Filter, R: Rng> Port<Running<'a>, R, C, F> {
return actions![];
}

self.handle_general_internal(message)
}

fn handle_general_internal(&mut self, message: Message<'_>) -> PortActionIterator<'_> {
match message.body {
MessageBody::Announce(announce) => {
self.bmca
Expand Down Expand Up @@ -554,3 +560,229 @@ impl<'a, C, F: Filter, R: Rng> Port<InBmca<'a>, R, C, F> {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{
datastructures::messages::{AnnounceMessage, Header, PtpVersion},
BasicFilter, DelayMechanism, InstanceConfig, Interval,
};

struct TestClock;

impl Clock for TestClock {
type Error = ();

fn set_frequency(&mut self, _freq: f64) -> Result<Time, Self::Error> {
Ok(Time::default())
}

fn now(&self) -> Time {
panic!("Shouldn't be called");
}

fn set_properties(
&mut self,
_time_properties_ds: &crate::TimePropertiesDS,
) -> Result<(), Self::Error> {
Ok(())
}

fn step_clock(&mut self, _offset: Duration) -> Result<Time, Self::Error> {
Ok(Time::default())
}
}

fn default_announce_message_header() -> Header {
Header {
sdo_id: Default::default(),
version: PtpVersion::new(2, 1).unwrap(),
domain_number: Default::default(),
alternate_master_flag: false,
two_step_flag: false,
unicast_flag: false,
ptp_profile_specific_1: false,
ptp_profile_specific_2: false,
leap61: false,
leap59: false,
current_utc_offset_valid: false,
ptp_timescale: false,
time_tracable: false,
frequency_tracable: false,
synchronization_uncertain: false,
correction_field: Default::default(),
source_port_identity: Default::default(),
sequence_id: Default::default(),
log_message_interval: Default::default(),
}
}

fn default_announce_message() -> AnnounceMessage {
AnnounceMessage {
header: default_announce_message_header(),
origin_timestamp: Default::default(),
current_utc_offset: Default::default(),
grandmaster_priority_1: Default::default(),
grandmaster_clock_quality: Default::default(),
grandmaster_priority_2: Default::default(),
grandmaster_identity: Default::default(),
steps_removed: Default::default(),
time_source: Default::default(),
}
}

#[test]
fn test_announce_receive() {
let default_ds = DefaultDS::new(InstanceConfig {
clock_identity: Default::default(),
priority_1: 255,
priority_2: 255,
domain_number: 0,
slave_only: false,
sdo_id: Default::default(),
});

let parent_ds = ParentDS::new(default_ds);

let state = AtomicRefCell::new(PtpInstanceState {
default_ds,
current_ds: Default::default(),
parent_ds,
time_properties_ds: Default::default(),
});

let port = Port::<_, _, _, BasicFilter>::new(
&state,
PortConfig {
delay_mechanism: DelayMechanism::E2E {
interval: Interval::from_log_2(1),
},
announce_interval: Interval::from_log_2(1),
announce_receipt_timeout: 3,
sync_interval: Interval::from_log_2(0),
master_only: false,
delay_asymmetry: Duration::ZERO,
},
0.25,
TestClock,
Default::default(),
rand::rngs::mock::StepRng::new(2, 1),
);

let (mut port, _) = port.end_bmca();

let mut announce = default_announce_message();
announce.header.source_port_identity.clock_identity.0 = [1, 2, 3, 4, 5, 6, 7, 8];
let announce_message = Message {
header: announce.header,
body: MessageBody::Announce(announce),
suffix: Default::default(),
};
let mut packet = [0; MAX_DATA_LEN];
let packet_len = announce_message.serialize(&mut packet).unwrap();
let packet = &packet[..packet_len];

let mut actions = port.handle_general_receive(packet);
let Some(PortAction::ResetAnnounceReceiptTimer { .. }) = actions.next() else {
panic!("Unexpected action");
};
assert!(actions.next().is_none());
drop(actions);

let mut actions = port.handle_general_receive(packet);
let Some(PortAction::ResetAnnounceReceiptTimer { .. }) = actions.next() else {
panic!("Unexpected action");
};
assert!(actions.next().is_none());
drop(actions);

let mut actions = port.handle_general_receive(packet);
let Some(PortAction::ResetAnnounceReceiptTimer { .. }) = actions.next() else {
panic!("Unexpected action");
};
assert!(actions.next().is_none());
drop(actions);

let mut port = port.start_bmca();
port.calculate_best_local_announce_message();
assert!(port.best_local_announce_message().is_some());
}

#[test]
fn test_announce_receive_via_timecritical() {
let default_ds = DefaultDS::new(InstanceConfig {
clock_identity: Default::default(),
priority_1: 255,
priority_2: 255,
domain_number: 0,
slave_only: false,
sdo_id: Default::default(),
});

let parent_ds = ParentDS::new(default_ds);

let state = AtomicRefCell::new(PtpInstanceState {
default_ds,
current_ds: Default::default(),
parent_ds,
time_properties_ds: Default::default(),
});

let port = Port::<_, _, _, BasicFilter>::new(
&state,
PortConfig {
delay_mechanism: DelayMechanism::E2E {
interval: Interval::from_log_2(1),
},
announce_interval: Interval::from_log_2(1),
announce_receipt_timeout: 3,
sync_interval: Interval::from_log_2(0),
master_only: false,
delay_asymmetry: Duration::ZERO,
},
0.25,
TestClock,
Default::default(),
rand::rngs::mock::StepRng::new(2, 1),
);

let (mut port, _) = port.end_bmca();

let mut announce = default_announce_message();
announce.header.source_port_identity.clock_identity.0 = [1, 2, 3, 4, 5, 6, 7, 8];
let announce_message = Message {
header: announce.header,
body: MessageBody::Announce(announce),
suffix: Default::default(),
};
let mut packet = [0; MAX_DATA_LEN];
let packet_len = announce_message.serialize(&mut packet).unwrap();
let packet = &packet[..packet_len];

let mut actions = port.handle_timecritical_receive(packet, Time::from_micros(1));
let Some(PortAction::ResetAnnounceReceiptTimer { .. }) = actions.next() else {
panic!("Unexpected action");
};
assert!(actions.next().is_none());
drop(actions);

let mut actions = port.handle_timecritical_receive(packet, Time::from_micros(2));
let Some(PortAction::ResetAnnounceReceiptTimer { .. }) = actions.next() else {
panic!("Unexpected action");
};
assert!(actions.next().is_none());
drop(actions);

let mut actions = port.handle_timecritical_receive(packet, Time::from_micros(3));
let Some(PortAction::ResetAnnounceReceiptTimer { .. }) = actions.next() else {
panic!("Unexpected action");
};
assert!(actions.next().is_none());
drop(actions);

let mut port = port.start_bmca();
port.calculate_best_local_announce_message();
assert!(port.best_local_announce_message().is_some());
}
}

0 comments on commit 3310a95

Please sign in to comment.