Skip to content

Commit

Permalink
Changed to new socket interface from network library.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidv1992 authored and folkertdev committed Oct 5, 2023
1 parent 3310a95 commit a9943e8
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 306 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ sync_interval = 0
announce_receipt_timeout = 3
delay_asymetry = 0
delay_mechanism = 0
master_only = false

[[port]]
interface = "enp0s31f6"
Expand All @@ -24,4 +23,3 @@ sync_interval = 0
announce_receipt_timeout = 3
delay_asymetry = 0
delay_mechanism = 0
master_only = false
2 changes: 1 addition & 1 deletion statime-linux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ serde = { version = "1.0.188", features = ["derive"] }


clock-steering = { git = "https://github.com/pendulum-project/clock-steering.git", rev = "3ab6721" }
timestamped-socket = { git = "https://github.com/pendulum-project/timestamped-socket.git", rev = "95c2472", features = ["serde"] }
timestamped-socket = { git = "https://github.com/pendulum-project/timestamped-socket.git", rev = "b98af7e", features = ["serde"] }
18 changes: 11 additions & 7 deletions statime-linux/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::Deserialize;
use statime::{DelayMechanism, Duration, Interval};
use timestamped_socket::interface::InterfaceName;

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "kebab-case", deny_unknown_fields)]
pub struct Config {
pub loglevel: String,
Expand All @@ -18,12 +18,15 @@ pub struct Config {
pub ports: Vec<PortConfig>,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct PortConfig {
pub interface: InterfaceName,
#[serde(default)]
pub network_mode: NetworkMode,
pub announce_interval: i8,
pub sync_interval: i8,
pub announce_receipt_timeout: u8,
#[serde(default)]
pub master_only: bool,
pub delay_asymetry: i64,
pub delay_mechanism: i8,
Expand All @@ -44,11 +47,12 @@ impl From<PortConfig> for statime::PortConfig {
}
}

#[derive(Deserialize, Debug)]
pub enum PtpMode {
Ordinary,
Boundary,
Transparant,
#[derive(Deserialize, Debug, Clone, Copy, Default, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum NetworkMode {
#[default]
Ipv4,
Ipv6,
}

impl Config {
Expand Down
179 changes: 84 additions & 95 deletions statime-linux/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,21 @@ use clap::Parser;
use fern::colors::Color;
use rand::{rngs::StdRng, SeedableRng};
use statime::{
BasicFilter, Clock, ClockIdentity, InBmca, InstanceConfig, Port, PortAction,
PortActionIterator, PtpInstance, SdoId, Time, TimePropertiesDS, TimeSource, TimestampContext,
MAX_DATA_LEN,
BasicFilter, ClockIdentity, InBmca, InstanceConfig, Port, PortAction, PortActionIterator,
PtpInstance, SdoId, Time, TimePropertiesDS, TimeSource, TimestampContext, MAX_DATA_LEN,
};
use statime_linux::{
clock::LinuxClock,
config::Config,
socket::{EventSocket, GeneralSocket},
socket::{
open_ipv4_event_socket, open_ipv4_general_socket, open_ipv6_event_socket,
open_ipv6_general_socket, timestamp_to_time, PtpTargetAddress,
},
};
use timestamped_socket::{
interface::{InterfaceDescriptor, InterfaceIterator},
raw_udp_socket::TimestampingMode,
interface::InterfaceIterator,
networkaddress::NetworkAddress,
socket::{Open, Socket},
};
use tokio::{
sync::mpsc::{Receiver, Sender},
Expand All @@ -30,12 +33,6 @@ use tokio::{
#[derive(Clone, Copy)]
struct SdoIdParser;

pub struct PortDefinition {
port: BmcaPort,
interface: InterfaceDescriptor,
timestamping_mode: TimestampingMode,
}

impl clap::builder::TypedValueParser for SdoIdParser {
type Value = SdoId;

Expand Down Expand Up @@ -200,72 +197,75 @@ async fn actual_main() {
time_properties_ds,
)));

// for every port in the config file, create a port definition and add it
// to the instance
let ports = config.ports.into_iter().map(|port_config| {
let interface = InterfaceDescriptor::from_str(port_config.interface.as_str()).unwrap();
// NOTE: Hardware timestamping is ignored for now
// let timestamping_mode = if config.hardware_clock.is_some() {
// match interface_descriptor.interface_name {
// Some(interface_name) => TimestampingMode::Hardware(interface_name),
// None => panic!("an interface name is required when using hardware
// timestamping"), }
// } else {
// TimestampingMode::Software
// };
let timestamping_mode = TimestampingMode::Software;
let rng = StdRng::from_entropy();
let port = instance.add_port(port_config.into(), 0.25, local_clock.clone(), rng);

PortDefinition {
port,
interface,
timestamping_mode,
}
});

run(ports, &local_clock, instance).await.unwrap()
}

async fn run(
ports: impl Iterator<Item = PortDefinition> + ExactSizeIterator,
local_clock: &LinuxClock,
instance: &'static PtpInstance<BasicFilter>,
) -> std::io::Result<()> {
let (bmca_notify_sender, bmca_notify_receiver) = tokio::sync::watch::channel(false);

let mut main_task_senders = Vec::with_capacity(ports.len());
let mut main_task_receivers = Vec::with_capacity(ports.len());
let mut main_task_senders = Vec::with_capacity(config.ports.len());
let mut main_task_receivers = Vec::with_capacity(config.ports.len());

for port_definition in ports.into_iter() {
let event_socket = EventSocket::new(
&port_definition.interface,
port_definition.timestamping_mode,
)
.await?;
let general_socket = GeneralSocket::new(&port_definition.interface).await?;
for port_config in config.ports {
let interface = port_config.interface;
let network_mode = port_config.network_mode;
let rng = StdRng::from_entropy();
let port = instance.add_port(port_config.into(), 0.25, local_clock.clone(), rng);

let (main_task_sender, port_task_receiver) = tokio::sync::mpsc::channel(1);
let (port_task_sender, main_task_receiver) = tokio::sync::mpsc::channel(1);

tokio::spawn(port_task(
port_task_receiver,
port_task_sender,
event_socket,
general_socket,
local_clock.clone(),
bmca_notify_receiver.clone(),
));

main_task_sender
.send(port_definition.port)
.send(port)
.await
.expect("space in channel buffer");

main_task_senders.push(main_task_sender);
main_task_receivers.push(main_task_receiver);

match network_mode {
statime_linux::config::NetworkMode::Ipv4 => {
let event_socket =
open_ipv4_event_socket(interface).expect("Could not open event socket");
let general_socket =
open_ipv4_general_socket(interface).expect("Could not open general socket");

tokio::spawn(port_task(
port_task_receiver,
port_task_sender,
event_socket,
general_socket,
bmca_notify_receiver.clone(),
));
}
statime_linux::config::NetworkMode::Ipv6 => {
let event_socket =
open_ipv6_event_socket(interface).expect("Could not open event socket");
let general_socket =
open_ipv6_general_socket(interface).expect("Could not open general socket");

tokio::spawn(port_task(
port_task_receiver,
port_task_sender,
event_socket,
general_socket,
bmca_notify_receiver.clone(),
));
}
}
}

run(
instance,
bmca_notify_sender,
main_task_receivers,
main_task_senders,
)
.await
}

async fn run(
instance: &'static PtpInstance<BasicFilter>,
bmca_notify_sender: tokio::sync::watch::Sender<bool>,
mut main_task_receivers: Vec<Receiver<Port<InBmca<'static>, StdRng, LinuxClock, BasicFilter>>>,
main_task_senders: Vec<Sender<Port<InBmca<'static>, StdRng, LinuxClock, BasicFilter>>>,
) -> ! {
// run bmca over all of the ports at the same time. The ports don't perform
// their normal actions at this time: bmca is stop-the-world!
let mut bmca_timer = pin!(Timer::new());
Expand Down Expand Up @@ -316,12 +316,11 @@ type BmcaPort = Port<InBmca<'static>, StdRng, LinuxClock, BasicFilter>;
// It will then move the port into the running state, and process actions. When
// the task is notified of a BMCA, it will stop running, move the port into the
// bmca state, and send it on its Sender
async fn port_task(
async fn port_task<A: NetworkAddress + PtpTargetAddress>(
mut port_task_receiver: Receiver<BmcaPort>,
port_task_sender: Sender<BmcaPort>,
mut event_socket: EventSocket,
mut general_socket: GeneralSocket,
local_clock: LinuxClock,
mut event_socket: Socket<A, Open>,
mut general_socket: Socket<A, Open>,
mut bmca_notify: tokio::sync::watch::Receiver<bool>,
) {
let mut timers = Timers {
Expand All @@ -338,22 +337,15 @@ async fn port_task(
// handle post-bmca actions
let (mut port, actions) = port_in_bmca.end_bmca();

let mut pending_timestamp = handle_actions(
actions,
&mut event_socket,
&mut general_socket,
&mut timers,
&local_clock,
)
.await;
let mut pending_timestamp =
handle_actions(actions, &mut event_socket, &mut general_socket, &mut timers).await;

while let Some((context, timestamp)) = pending_timestamp {
pending_timestamp = handle_actions(
port.handle_send_timestamp(context, timestamp),
&mut event_socket,
&mut general_socket,
&mut timers,
&local_clock,
)
.await;
}
Expand All @@ -363,12 +355,12 @@ async fn port_task(

loop {
let mut actions = tokio::select! {
result = event_socket.recv(&local_clock, &mut event_buffer) => match result {
Ok(packet) => port.handle_timecritical_receive(packet.data, packet.timestamp),
result = event_socket.recv(&mut event_buffer) => match result {
Ok(packet) => port.handle_timecritical_receive(&event_buffer[..packet.bytes_read], timestamp_to_time(packet.timestamp.expect("Missing timestamp on recv"))),
Err(error) => panic!("Error receiving: {error:?}"),
},
result = general_socket.recv(&mut general_buffer) => match result {
Ok(packet) => port.handle_general_receive(packet.data),
Ok(packet) => port.handle_general_receive(&general_buffer[..packet.bytes_read]),
Err(error) => panic!("Error receiving: {error:?}"),
},
() = &mut timers.port_announce_timer => {
Expand All @@ -393,14 +385,9 @@ async fn port_task(
};

loop {
let pending_timestamp = handle_actions(
actions,
&mut event_socket,
&mut general_socket,
&mut timers,
&local_clock,
)
.await;
let pending_timestamp =
handle_actions(actions, &mut event_socket, &mut general_socket, &mut timers)
.await;

// there might be more actions to handle based on the current action
actions = match pending_timestamp {
Expand All @@ -423,12 +410,11 @@ struct Timers<'a> {
filter_update_timer: Pin<&'a mut Timer>,
}

async fn handle_actions(
async fn handle_actions<A: NetworkAddress + PtpTargetAddress>(
actions: PortActionIterator<'_>,
event_socket: &mut EventSocket,
general_socket: &mut GeneralSocket,
event_socket: &mut Socket<A, Open>,
general_socket: &mut Socket<A, Open>,
timers: &mut Timers<'_>,
local_clock: &LinuxClock,
) -> Option<(TimestampContext, Time)> {
let mut pending_timestamp = None;

Expand All @@ -437,16 +423,19 @@ async fn handle_actions(
PortAction::SendTimeCritical { context, data } => {
// send timestamp of the send
let time = event_socket
.send(data)
.send_to(data, A::PRIMARY_EVENT)
.await
.unwrap()
.unwrap_or(local_clock.now());
.unwrap();

// anything we send later will have a later pending (send) timestamp
pending_timestamp = Some((context, time));
pending_timestamp = Some((context, timestamp_to_time(time)));
}
PortAction::SendGeneral { data } => {
general_socket.send(data).await.unwrap();
general_socket
.send_to(data, A::PRIMARY_GENERAL)
.await
.unwrap();
}
PortAction::ResetAnnounceTimer { duration } => {
timers.port_announce_timer.as_mut().reset(duration);
Expand Down
Loading

0 comments on commit a9943e8

Please sign in to comment.