Skip to content

Commit

Permalink
Add wip QoS-based priority-to-link dispatch impl
Browse files Browse the repository at this point in the history
  • Loading branch information
fuzzypixelz committed Sep 4, 2024
1 parent 7ff82cf commit 2a951e9
Show file tree
Hide file tree
Showing 27 changed files with 545 additions and 287 deletions.
5 changes: 5 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@
/// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"]
/// For example, to only enable "tls" and "quic":
// protocols: ["tls", "quic"],
///
/// Endpoints accept a "priorities" metadata value in the form of a Rust-style half-open range
/// (e.g. `2..4` signifies priorities 2 and 3). This value is used to select the link used
/// for transmission based on the priority of the message in question.
///
/// Configure the zenoh TX parameters of a link
tx: {
/// The resolution in bits to be used for the message sequence numbers.
Expand Down
61 changes: 59 additions & 2 deletions commons/zenoh-protocol/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use core::{
str::FromStr,
};

use serde::Serialize;
pub use uhlc::{Timestamp, NTP64};
use zenoh_keyexpr::OwnedKeyExpr;
use zenoh_result::{bail, zerror};
use zenoh_result::{bail, zerror, ZResult};

/// The unique Id of the [`HLC`](uhlc::HLC) that generated the concerned [`Timestamp`].
pub type TimestampId = uhlc::ID;
Expand Down Expand Up @@ -308,6 +309,52 @@ pub enum Priority {
Background = 7,
}

#[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq, Serialize)]
pub struct PriorityRange {
pub start: u8,
pub end: u8,
}

impl PriorityRange {
pub fn new(start: u8, end: u8) -> ZResult<Self> {
if start >= end || start < Priority::MAX as u8 || end > Priority::MIN as u8 + 1 {
bail!("Invalid priority range: {start}..{end}")
};

Ok(Self { start, end })
}

pub fn includes(&self, priority: Priority) -> bool {
self.start <= (priority as u8) && (priority as u8) < self.end
}

pub fn len(&self) -> usize {
(self.end - self.start) as usize
}

pub fn is_empty(&self) -> bool {
self.end == self.start
}

pub fn start(&self) -> u8 {
self.start
}

pub fn end(&self) -> u8 {
self.end
}

#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::Rng;
let mut rng = rand::thread_rng();
let start = rng.gen_range(Priority::MAX as u8..Priority::MIN as u8);
let end = rng.gen_range((start + 1)..=Priority::MIN as u8);

Self { start, end }
}
}

impl Priority {
/// Default
pub const DEFAULT: Self = Self::Data;
Expand Down Expand Up @@ -342,7 +389,7 @@ impl TryFrom<u8> for Priority {
}
}

#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize)]
#[repr(u8)]
pub enum Reliability {
#[default]
Expand All @@ -367,6 +414,16 @@ impl Reliability {
}
}

impl From<bool> for Reliability {
fn from(value: bool) -> Self {
if value {
Reliability::Reliable
} else {
Reliability::BestEffort
}
}
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
pub struct Channel {
pub priority: Priority,
Expand Down
14 changes: 7 additions & 7 deletions commons/zenoh-protocol/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ pub struct InitSyn {
// Extensions
pub mod ext {
use crate::{
common::{ZExtUnit, ZExtZBuf},
zextunit, zextzbuf,
common::{ZExtUnit, ZExtZ64, ZExtZBuf},
zextunit, zextz64, zextzbuf,
};

/// # QoS extension
/// Used to negotiate the use of QoS
pub type QoS = zextunit!(0x1, false);
pub type QoS = zextz64!(0x1, false);

/// # Shm extension
/// Used as challenge for probing shared memory capabilities
Expand Down Expand Up @@ -161,7 +161,7 @@ impl InitSyn {
pub fn rand() -> Self {
use rand::Rng;

use crate::common::{ZExtUnit, ZExtZBuf};
use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf};

let mut rng = rand::thread_rng();

Expand All @@ -170,7 +170,7 @@ impl InitSyn {
let zid = ZenohIdProto::default();
let resolution = Resolution::rand();
let batch_size: BatchSize = rng.gen();
let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand());
#[cfg(feature = "shared-memory")]
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
Expand Down Expand Up @@ -217,7 +217,7 @@ impl InitAck {
pub fn rand() -> Self {
use rand::Rng;

use crate::common::{ZExtUnit, ZExtZBuf};
use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf};

let mut rng = rand::thread_rng();

Expand All @@ -231,7 +231,7 @@ impl InitAck {
};
let batch_size: BatchSize = rng.gen();
let cookie = ZSlice::rand(64);
let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand());
#[cfg(feature = "shared-memory")]
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
Expand Down
38 changes: 17 additions & 21 deletions io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ pub use listener::*;
pub use multicast::*;
use serde::Serialize;
pub use unicast::*;
use zenoh_protocol::{core::Locator, transport::BatchSize};
use zenoh_protocol::{
core::{Locator, PriorityRange, Reliability},
transport::BatchSize,
};
use zenoh_result::ZResult;

/*************************************/
Expand All @@ -48,10 +51,11 @@ pub struct Link {
pub dst: Locator,
pub group: Option<Locator>,
pub mtu: BatchSize,
pub is_reliable: bool,
pub is_streamed: bool,
pub interfaces: Vec<String>,
pub auth_identifier: LinkAuthId,
pub priorities: Option<PriorityRange>,
pub reliability: Reliability,
}

#[async_trait]
Expand All @@ -70,48 +74,40 @@ impl fmt::Display for Link {
}
}

impl From<&LinkUnicast> for Link {
fn from(link: &LinkUnicast) -> Link {
impl Link {
pub fn new_unicast(
link: &LinkUnicast,
priorities: Option<PriorityRange>,
reliability: Reliability,
) -> Self {
Link {
src: link.get_src().to_owned(),
dst: link.get_dst().to_owned(),
group: None,
mtu: link.get_mtu(),
is_reliable: link.is_reliable(),
is_streamed: link.is_streamed(),
interfaces: link.get_interface_names(),
auth_identifier: link.get_auth_id().clone(),
priorities,
reliability,
}
}
}

impl From<LinkUnicast> for Link {
fn from(link: LinkUnicast) -> Link {
Link::from(&link)
}
}

impl From<&LinkMulticast> for Link {
fn from(link: &LinkMulticast) -> Link {
pub fn new_multicast(link: &LinkMulticast) -> Self {
Link {
src: link.get_src().to_owned(),
dst: link.get_dst().to_owned(),
group: Some(link.get_dst().to_owned()),
mtu: link.get_mtu(),
is_reliable: link.is_reliable(),
is_streamed: false,
interfaces: vec![],
auth_identifier: LinkAuthId::default(),
priorities: None,
reliability: Reliability::from(link.is_reliable()),
}
}
}

impl From<LinkMulticast> for Link {
fn from(link: LinkMulticast) -> Link {
Link::from(&link)
}
}

impl PartialEq<LinkUnicast> for Link {
fn eq(&self, other: &LinkUnicast) -> bool {
self.src == *other.get_src() && self.dst == *other.get_dst()
Expand Down
13 changes: 12 additions & 1 deletion io/zenoh-link-commons/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,18 @@ pub trait LinkManagerUnicastTrait: Send + Sync {
async fn get_listeners(&self) -> Vec<EndPoint>;
async fn get_locators(&self) -> Vec<Locator>;
}
pub type NewLinkChannelSender = flume::Sender<LinkUnicast>;
pub type NewLinkChannelSender = flume::Sender<NewLinkUnicast>;

/// Notification of a new inbound connection.
///
/// Link implementations should preserve the metadata sections of [`NewLinkUnicast::endpoint`].
pub struct NewLinkUnicast {
/// The link created in response to a new inbound connection.
pub link: LinkUnicast,
/// Endpoint of the listener.
pub endpoint: EndPoint,
}

pub trait ConstructibleLinkManagerUnicast<T>: Sized {
fn new(new_link_sender: NewLinkChannelSender, config: T) -> ZResult<Self>;
}
Expand Down
20 changes: 12 additions & 8 deletions io/zenoh-links/zenoh-link-quic/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use x509_parser::prelude::*;
use zenoh_core::zasynclock;
use zenoh_link_commons::{
get_ip_interface_names, LinkAuthId, LinkAuthType, LinkManagerUnicastTrait, LinkUnicast,
LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender,
LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, NewLinkUnicast,
};
use zenoh_protocol::{
core::{EndPoint, Locator},
Expand Down Expand Up @@ -332,11 +332,14 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic {

// Spawn the accept loop for the listener
let token = self.listeners.token.child_token();
let c_token = token.clone();

let c_manager = self.manager.clone();
let task = {
let token = token.clone();
let manager = self.manager.clone();
let endpoint = endpoint.clone();

let task = async move { accept_task(quic_endpoint, c_token, c_manager).await };
async move { accept_task(endpoint, quic_endpoint, token, manager).await }
};

// Initialize the QuicAcceptor
let locator = endpoint.to_locator();
Expand Down Expand Up @@ -364,7 +367,8 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic {
}

async fn accept_task(
endpoint: quinn::Endpoint,
endpoint: EndPoint,
quic_endpoint: quinn::Endpoint,
token: CancellationToken,
manager: NewLinkChannelSender,
) -> ZResult<()> {
Expand All @@ -382,7 +386,7 @@ async fn accept_task(
Ok(conn)
}

let src_addr = endpoint
let src_addr = quic_endpoint
.local_addr()
.map_err(|e| zerror!("Can not accept QUIC connections: {}", e))?;

Expand All @@ -393,7 +397,7 @@ async fn accept_task(
tokio::select! {
_ = token.cancelled() => break,

res = accept(endpoint.accept()) => {
res = accept(quic_endpoint.accept()) => {
match res {
Ok(quic_conn) => {
// Get the bideractional streams. Note that we don't allow unidirectional streams.
Expand Down Expand Up @@ -429,7 +433,7 @@ async fn accept_task(
));

// Communicate the new link to the initial transport manager
if let Err(e) = manager.send_async(LinkUnicast(link)).await {
if let Err(e) = manager.send_async(NewLinkUnicast { link: LinkUnicast(link), endpoint: endpoint.clone() }).await {
tracing::error!("{}-{}: {}", file!(), line!(), e)
}

Expand Down
31 changes: 18 additions & 13 deletions io/zenoh-links/zenoh-link-serial/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use z_serial::ZSerial;
use zenoh_core::{zasynclock, zasyncread, zasyncwrite};
use zenoh_link_commons::{
ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast,
LinkUnicastTrait, NewLinkChannelSender,
LinkUnicastTrait, NewLinkChannelSender, NewLinkUnicast,
};
use zenoh_protocol::{
core::{EndPoint, Locator},
Expand Down Expand Up @@ -332,19 +332,23 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial {

// Spawn the accept loop for the listener
let token = CancellationToken::new();
let c_token = token.clone();
let mut listeners = zasyncwrite!(self.listeners);

let c_path = path.clone();
let c_manager = self.manager.clone();
let c_listeners = self.listeners.clone();

let task = async move {
// Wait for the accept loop to terminate
let res =
accept_read_task(link, c_token, c_manager, c_path.clone(), is_connected).await;
zasyncwrite!(c_listeners).remove(&c_path);
res
let task = {
let token = token.clone();
let path = path.clone();
let manager = self.manager.clone();
let listeners = self.listeners.clone();
let endpoint = endpoint.clone();

async move {
// Wait for the accept loop to terminate
let res =
accept_read_task(endpoint, link, token, manager, path.clone(), is_connected)
.await;
zasyncwrite!(listeners).remove(&path);
res
}
};
let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task);

Expand Down Expand Up @@ -390,6 +394,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial {
}

async fn accept_read_task(
endpoint: EndPoint,
link: Arc<LinkUnicastSerial>,
token: CancellationToken,
manager: NewLinkChannelSender,
Expand Down Expand Up @@ -423,7 +428,7 @@ async fn accept_read_task(
match res {
Ok(link) => {
// Communicate the new link to the initial transport manager
if let Err(e) = manager.send_async(LinkUnicast(link.clone())).await {
if let Err(e) = manager.send_async(NewLinkUnicast{ link: LinkUnicast(link.clone()), endpoint: endpoint.clone() }).await {
tracing::error!("{}-{}: {}", file!(), line!(), e)
}

Expand Down
Loading

0 comments on commit 2a951e9

Please sign in to comment.