Skip to content

Commit

Permalink
Implemented support for ethernet transport.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidv1992 committed Oct 11, 2023
1 parent 426f448 commit 807fc16
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion statime-linux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ serde = { version = "1.0.188", features = ["derive"] }


clock-steering = { git = "https://github.com/pendulum-project/clock-steering.git", rev = "8ca7481" }
timestamped-socket = { git = "https://github.com/pendulum-project/timestamped-socket.git", rev = "b98af7e", features = ["serde"] }
timestamped-socket = { git = "https://github.com/pendulum-project/timestamped-socket.git", rev = "39d30b1", features = ["serde"] }
1 change: 1 addition & 0 deletions statime-linux/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub enum NetworkMode {
#[default]
Ipv4,
Ipv6,
Ethernet,
}

impl Config {
Expand Down
179 changes: 176 additions & 3 deletions statime-linux/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ use statime_linux::{
clock::LinuxClock,
config::Config,
socket::{
open_ipv4_event_socket, open_ipv4_general_socket, open_ipv6_event_socket,
open_ipv6_general_socket, timestamp_to_time, PtpTargetAddress,
open_ethernet_socket, open_ipv4_event_socket, open_ipv4_general_socket,
open_ipv6_event_socket, open_ipv6_general_socket, timestamp_to_time, PtpTargetAddress,
},
};
use timestamped_socket::{
interface::InterfaceIterator,
networkaddress::NetworkAddress,
networkaddress::{EthernetAddress, NetworkAddress},
socket::{InterfaceTimestampMode, Open, Socket},
};
use tokio::{
Expand Down Expand Up @@ -366,6 +366,20 @@ async fn actual_main() {
bmca_notify_receiver.clone(),
));
}
statime_linux::config::NetworkMode::Ethernet => {
let socket =
open_ethernet_socket(interface, timestamping).expect("Could not open socket");

tokio::spawn(ethernet_port_task(
port_task_receiver,
port_task_sender,
interface
.get_index()
.expect("Unable to get network interface index") as _,
socket,
bmca_notify_receiver.clone(),
));
}
}
}

Expand Down Expand Up @@ -544,6 +558,99 @@ async fn port_task<A: NetworkAddress + PtpTargetAddress>(
}
}

// the Port task for ethernet transport
//
// This task waits for a new port (in the bmca state) to arrive on its Receiver.
// It will then move the port into the running state, and process actions. When
// the task is notified of a BMCA, it will stop running, move the port into the
// bmca state, and send it on its Sender
async fn ethernet_port_task(
mut port_task_receiver: Receiver<BmcaPort>,
port_task_sender: Sender<BmcaPort>,
interface: libc::c_int,
mut socket: Socket<EthernetAddress, Open>,
mut bmca_notify: tokio::sync::watch::Receiver<bool>,
) {
let mut timers = Timers {
port_sync_timer: pin!(Timer::new()),
port_announce_timer: pin!(Timer::new()),
port_announce_timeout_timer: pin!(Timer::new()),
delay_request_timer: pin!(Timer::new()),
filter_update_timer: pin!(Timer::new()),
};

loop {
let port_in_bmca = port_task_receiver.recv().await.unwrap();

// handle post-bmca actions
let (mut port, actions) = port_in_bmca.end_bmca();

let mut pending_timestamp =
handle_actions_ethernet(actions, interface, &mut socket, &mut timers).await;

while let Some((context, timestamp)) = pending_timestamp {
pending_timestamp = handle_actions_ethernet(
port.handle_send_timestamp(context, timestamp),
interface,
&mut socket,
&mut timers,
)
.await;
}

let mut event_buffer = [0; MAX_DATA_LEN];

loop {
let mut actions = tokio::select! {
result = socket.recv(&mut event_buffer) => match result {
Ok(packet) => {
if let Some(timestamp) = packet.timestamp {
log::trace!("Recv timestamp: {:?}", packet.timestamp);
port.handle_event_receive(&event_buffer[..packet.bytes_read], timestamp_to_time(timestamp))
} else {
port.handle_general_receive(&event_buffer[..packet.bytes_read])
}
}
Err(error) => panic!("Error receiving: {error:?}"),
},
() = &mut timers.port_announce_timer => {
port.handle_announce_timer()
},
() = &mut timers.port_sync_timer => {
port.handle_sync_timer()
},
() = &mut timers.port_announce_timeout_timer => {
port.handle_announce_receipt_timer()
},
() = &mut timers.delay_request_timer => {
port.handle_delay_request_timer()
},
() = &mut timers.filter_update_timer => {
port.handle_filter_update_timer()
},
result = bmca_notify.wait_for(|v| *v) => match result {
Ok(_) => break,
Err(error) => panic!("Error on bmca notify: {error:?}"),
}
};

loop {
let pending_timestamp =
handle_actions_ethernet(actions, interface, &mut socket, &mut timers).await;

// there might be more actions to handle based on the current action
actions = match pending_timestamp {
Some((context, timestamp)) => port.handle_send_timestamp(context, timestamp),
None => break,
};
}
}

let port_in_bmca = port.start_bmca();
port_task_sender.send(port_in_bmca).await.unwrap();
}
}

struct Timers<'a> {
port_sync_timer: Pin<&'a mut Timer>,
port_announce_timer: Pin<&'a mut Timer>,
Expand Down Expand Up @@ -604,6 +711,72 @@ async fn handle_actions<A: NetworkAddress + PtpTargetAddress>(
pending_timestamp
}

async fn handle_actions_ethernet(
actions: PortActionIterator<'_>,
interface: libc::c_int,
socket: &mut Socket<EthernetAddress, Open>,
timers: &mut Timers<'_>,
) -> Option<(TimestampContext, Time)> {
let mut pending_timestamp = None;

for action in actions {
match action {
PortAction::SendEvent { context, data } => {
// send timestamp of the send
let time = socket
.send_to(
data,
EthernetAddress::new(
EthernetAddress::PRIMARY_EVENT.protocol(),
EthernetAddress::PRIMARY_EVENT.mac(),
interface,
),
)
.await
.expect("Failed to send event message");

// anything we send later will have a later pending (send) timestamp
if let Some(time) = time {
log::trace!("Send timestamp {:?}", time);
pending_timestamp = Some((context, timestamp_to_time(time)));
} else {
log::error!("Missing send timestamp");
}
}
PortAction::SendGeneral { data } => {
socket
.send_to(
data,
EthernetAddress::new(
EthernetAddress::PRIMARY_GENERAL.protocol(),
EthernetAddress::PRIMARY_GENERAL.mac(),
interface,
),
)
.await
.expect("Failed to send general message");
}
PortAction::ResetAnnounceTimer { duration } => {
timers.port_announce_timer.as_mut().reset(duration);
}
PortAction::ResetSyncTimer { duration } => {
timers.port_sync_timer.as_mut().reset(duration);
}
PortAction::ResetDelayRequestTimer { duration } => {
timers.delay_request_timer.as_mut().reset(duration);
}
PortAction::ResetAnnounceReceiptTimer { duration } => {
timers.port_announce_timeout_timer.as_mut().reset(duration);
}
PortAction::ResetFilterUpdateTimer { duration } => {
timers.filter_update_timer.as_mut().reset(duration);
}
}
}

pending_timestamp
}

fn get_clock_id() -> Option<[u8; 8]> {
let candidates = InterfaceIterator::new()
.unwrap()
Expand Down
33 changes: 32 additions & 1 deletion statime-linux/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6};
use statime::Time;
use timestamped_socket::{
interface::InterfaceName,
socket::{open_interface_udp4, open_interface_udp6, InterfaceTimestampMode, Open, Socket},
networkaddress::{EthernetAddress, MacAddress},
socket::{
open_interface_ethernet, open_interface_udp4, open_interface_udp6, InterfaceTimestampMode,
Open, Socket,
},
};

const IPV6_PRIMARY_MULTICAST: Ipv6Addr = Ipv6Addr::new(0xff, 0x0e, 0, 0, 0, 0, 0x01, 0x81);
Expand All @@ -19,6 +23,8 @@ const IPV4_PDELAY_MULTICAST: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 107);
const EVENT_PORT: u16 = 319;
const GENERAL_PORT: u16 = 320;

const PTP_ETHERTYPE: u16 = 0x88f7;

pub trait PtpTargetAddress {
const PRIMARY_EVENT: Self;
const PRIMARY_GENERAL: Self;
Expand All @@ -40,6 +46,21 @@ impl PtpTargetAddress for SocketAddrV6 {
const PDELAY_GENERAL: Self = SocketAddrV6::new(IPV6_PDELAY_MULTICAST, GENERAL_PORT, 0, 0);
}

impl PtpTargetAddress for EthernetAddress {
const PRIMARY_EVENT: Self = EthernetAddress::new(
PTP_ETHERTYPE,
MacAddress::new([0x01, 0x1b, 0x19, 0x00, 0x00, 0x00]),
0,
);
const PRIMARY_GENERAL: Self = Self::PRIMARY_EVENT;
const PDELAY_EVENT: Self = EthernetAddress::new(
PTP_ETHERTYPE,
MacAddress::new([0x01, 0x80, 0xc2, 0x00, 0x00, 0x0e]),
0,
);
const PDELAY_GENERAL: Self = Self::PDELAY_EVENT;
}

pub fn open_ipv4_event_socket(
interface: InterfaceName,
timestamping: InterfaceTimestampMode,
Expand Down Expand Up @@ -85,6 +106,16 @@ pub fn open_ipv6_general_socket(
Ok(socket)
}

pub fn open_ethernet_socket(
interface: InterfaceName,
timestamping: InterfaceTimestampMode,
) -> std::io::Result<Socket<EthernetAddress, Open>> {
let socket = open_interface_ethernet(interface, PTP_ETHERTYPE, timestamping)?;
socket.join_multicast(EthernetAddress::PRIMARY_EVENT, interface)?;
socket.join_multicast(EthernetAddress::PDELAY_EVENT, interface)?;
Ok(socket)
}

pub fn timestamp_to_time(ts: timestamped_socket::socket::Timestamp) -> Time {
Time::from_fixed_nanos(ts.seconds as i128 * 1_000_000_000i128 + ts.nanos as i128)
}
10 changes: 9 additions & 1 deletion statime/src/datastructures/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use self::{
use super::{
common::{PortIdentity, TimeInterval, TlvSet, WireTimestamp},
datasets::DefaultDS,
WireFormatError,
};
use crate::{ptp_instance::PtpInstanceState, Interval, LeapIndicator, Time};

Expand Down Expand Up @@ -404,8 +405,15 @@ impl<'a> Message<'a> {
pub(crate) fn deserialize(buffer: &'a [u8]) -> Result<Self, super::WireFormatError> {
let header_data = Header::deserialize_header(buffer)?;

if header_data.message_length < 34 {
return Err(WireFormatError::Invalid);
}

// Ensure we have the entire message and ignore potential padding
// Skip the header bytes and only keep the content
let content_buffer = &buffer[34..];
let content_buffer = buffer
.get(34..(header_data.message_length as usize))
.ok_or(WireFormatError::BufferTooShort)?;

let body = MessageBody::deserialize(
header_data.message_type,
Expand Down

0 comments on commit 807fc16

Please sign in to comment.