Skip to content

Commit

Permalink
Add wip QoS-based priority-to-link dispatch impl
Browse files Browse the repository at this point in the history
  • Loading branch information
fuzzypixelz committed Sep 2, 2024
1 parent 7ff82cf commit 90b068f
Show file tree
Hide file tree
Showing 21 changed files with 499 additions and 225 deletions.
5 changes: 5 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@
/// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"]
/// For example, to only enable "tls" and "quic":
// protocols: ["tls", "quic"],
///
/// Endpoints accept a "priorities" metadata value in the form of a Rust-style half-open range
/// (e.g. `2..4` signifies priorities 2 and 3). This value is used to select the link used
/// for transmission based on the priority of the message in question.
///
/// Configure the zenoh TX parameters of a link
tx: {
/// The resolution in bits to be used for the message sequence numbers.
Expand Down
59 changes: 58 additions & 1 deletion commons/zenoh-protocol/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use core::{
str::FromStr,
};

use serde::Serialize;
pub use uhlc::{Timestamp, NTP64};
use zenoh_keyexpr::OwnedKeyExpr;
use zenoh_result::{bail, zerror};
use zenoh_result::{bail, zerror, ZResult};

/// The unique Id of the [`HLC`](uhlc::HLC) that generated the concerned [`Timestamp`].
pub type TimestampId = uhlc::ID;
Expand Down Expand Up @@ -308,6 +309,52 @@ pub enum Priority {
Background = 7,
}

#[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq, Serialize)]
pub struct PriorityRange {
pub start: u8,
pub end: u8,
}

impl PriorityRange {
pub fn new(start: u8, end: u8) -> ZResult<Self> {
if start >= end || start < Priority::MAX as u8 || end > Priority::MIN as u8 + 1 {
bail!("Invalid priority range: {start}..{end}")
};

Ok(Self { start, end })
}

pub fn includes(&self, priority: Priority) -> bool {
self.start <= (priority as u8) && (priority as u8) < self.end
}

pub fn len(&self) -> usize {
(self.end - self.start) as usize
}

pub fn is_empty(&self) -> bool {
self.end == self.start
}

pub fn start(&self) -> u8 {
self.start
}

pub fn end(&self) -> u8 {
self.end
}

#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::Rng;
let mut rng = rand::thread_rng();
let start = rng.gen_range(Priority::MAX as u8..Priority::MIN as u8);
let end = rng.gen_range((start + 1)..=Priority::MIN as u8);

Self { start, end }
}
}

impl Priority {
/// Default
pub const DEFAULT: Self = Self::Data;
Expand Down Expand Up @@ -367,6 +414,16 @@ impl Reliability {
}
}

impl From<bool> for Reliability {
fn from(value: bool) -> Self {
if value {
Reliability::Reliable
} else {
Reliability::BestEffort
}
}
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
pub struct Channel {
pub priority: Priority,
Expand Down
14 changes: 7 additions & 7 deletions commons/zenoh-protocol/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ pub struct InitSyn {
// Extensions
pub mod ext {
use crate::{
common::{ZExtUnit, ZExtZBuf},
zextunit, zextzbuf,
common::{ZExtUnit, ZExtZ64, ZExtZBuf},
zextunit, zextz64, zextzbuf,
};

/// # QoS extension
/// Used to negotiate the use of QoS
pub type QoS = zextunit!(0x1, false);
pub type QoS = zextz64!(0x1, false);

/// # Shm extension
/// Used as challenge for probing shared memory capabilities
Expand Down Expand Up @@ -161,7 +161,7 @@ impl InitSyn {
pub fn rand() -> Self {
use rand::Rng;

use crate::common::{ZExtUnit, ZExtZBuf};
use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf};

let mut rng = rand::thread_rng();

Expand All @@ -170,7 +170,7 @@ impl InitSyn {
let zid = ZenohIdProto::default();
let resolution = Resolution::rand();
let batch_size: BatchSize = rng.gen();
let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand());
#[cfg(feature = "shared-memory")]
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
Expand Down Expand Up @@ -217,7 +217,7 @@ impl InitAck {
pub fn rand() -> Self {
use rand::Rng;

use crate::common::{ZExtUnit, ZExtZBuf};
use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf};

let mut rng = rand::thread_rng();

Expand All @@ -231,7 +231,7 @@ impl InitAck {
};
let batch_size: BatchSize = rng.gen();
let cookie = ZSlice::rand(64);
let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand());
#[cfg(feature = "shared-memory")]
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
Expand Down
8 changes: 7 additions & 1 deletion io/zenoh-link-commons/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ pub trait LinkManagerUnicastTrait: Send + Sync {
async fn get_listeners(&self) -> Vec<EndPoint>;
async fn get_locators(&self) -> Vec<Locator>;
}
pub type NewLinkChannelSender = flume::Sender<LinkUnicast>;
pub type NewLinkChannelSender = flume::Sender<NewLinkUnicast>;

pub struct NewLinkUnicast {
pub link: LinkUnicast,
pub endpoint: EndPoint,
}

pub trait ConstructibleLinkManagerUnicast<T>: Sized {
fn new(new_link_sender: NewLinkChannelSender, config: T) -> ZResult<Self>;
}
Expand Down
20 changes: 12 additions & 8 deletions io/zenoh-links/zenoh-link-quic/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use x509_parser::prelude::*;
use zenoh_core::zasynclock;
use zenoh_link_commons::{
get_ip_interface_names, LinkAuthId, LinkAuthType, LinkManagerUnicastTrait, LinkUnicast,
LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender,
LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, NewLinkUnicast,
};
use zenoh_protocol::{
core::{EndPoint, Locator},
Expand Down Expand Up @@ -332,11 +332,14 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic {

// Spawn the accept loop for the listener
let token = self.listeners.token.child_token();
let c_token = token.clone();

let c_manager = self.manager.clone();
let task = {
let token = token.clone();
let manager = self.manager.clone();
let endpoint = endpoint.clone();

let task = async move { accept_task(quic_endpoint, c_token, c_manager).await };
async move { accept_task(endpoint, quic_endpoint, token, manager).await }
};

// Initialize the QuicAcceptor
let locator = endpoint.to_locator();
Expand Down Expand Up @@ -364,7 +367,8 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic {
}

async fn accept_task(
endpoint: quinn::Endpoint,
endpoint: EndPoint,
quic_endpoint: quinn::Endpoint,
token: CancellationToken,
manager: NewLinkChannelSender,
) -> ZResult<()> {
Expand All @@ -382,7 +386,7 @@ async fn accept_task(
Ok(conn)
}

let src_addr = endpoint
let src_addr = quic_endpoint
.local_addr()
.map_err(|e| zerror!("Can not accept QUIC connections: {}", e))?;

Expand All @@ -393,7 +397,7 @@ async fn accept_task(
tokio::select! {
_ = token.cancelled() => break,

res = accept(endpoint.accept()) => {
res = accept(quic_endpoint.accept()) => {
match res {
Ok(quic_conn) => {
// Get the bideractional streams. Note that we don't allow unidirectional streams.
Expand Down Expand Up @@ -429,7 +433,7 @@ async fn accept_task(
));

// Communicate the new link to the initial transport manager
if let Err(e) = manager.send_async(LinkUnicast(link)).await {
if let Err(e) = manager.send_async(NewLinkUnicast { link: LinkUnicast(link), endpoint: endpoint.clone() }).await {
tracing::error!("{}-{}: {}", file!(), line!(), e)
}

Expand Down
31 changes: 18 additions & 13 deletions io/zenoh-links/zenoh-link-serial/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use z_serial::ZSerial;
use zenoh_core::{zasynclock, zasyncread, zasyncwrite};
use zenoh_link_commons::{
ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast,
LinkUnicastTrait, NewLinkChannelSender,
LinkUnicastTrait, NewLinkChannelSender, NewLinkUnicast,
};
use zenoh_protocol::{
core::{EndPoint, Locator},
Expand Down Expand Up @@ -332,19 +332,23 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial {

// Spawn the accept loop for the listener
let token = CancellationToken::new();
let c_token = token.clone();
let mut listeners = zasyncwrite!(self.listeners);

let c_path = path.clone();
let c_manager = self.manager.clone();
let c_listeners = self.listeners.clone();

let task = async move {
// Wait for the accept loop to terminate
let res =
accept_read_task(link, c_token, c_manager, c_path.clone(), is_connected).await;
zasyncwrite!(c_listeners).remove(&c_path);
res
let task = {
let token = token.clone();
let path = path.clone();
let manager = self.manager.clone();
let listeners = self.listeners.clone();
let endpoint = endpoint.clone();

async move {
// Wait for the accept loop to terminate
let res =
accept_read_task(endpoint, link, token, manager, path.clone(), is_connected)
.await;
zasyncwrite!(listeners).remove(&path);
res
}
};
let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task);

Expand Down Expand Up @@ -390,6 +394,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial {
}

async fn accept_read_task(
endpoint: EndPoint,
link: Arc<LinkUnicastSerial>,
token: CancellationToken,
manager: NewLinkChannelSender,
Expand Down Expand Up @@ -423,7 +428,7 @@ async fn accept_read_task(
match res {
Ok(link) => {
// Communicate the new link to the initial transport manager
if let Err(e) = manager.send_async(LinkUnicast(link.clone())).await {
if let Err(e) = manager.send_async(NewLinkUnicast{ link: LinkUnicast(link.clone()), endpoint: endpoint.clone() }).await {
tracing::error!("{}-{}: {}", file!(), line!(), e)
}

Expand Down
15 changes: 10 additions & 5 deletions io/zenoh-links/zenoh-link-tcp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio::{
use tokio_util::sync::CancellationToken;
use zenoh_link_commons::{
get_ip_interface_names, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait,
ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE,
ListenersUnicastIP, NewLinkChannelSender, NewLinkUnicast, BIND_INTERFACE,
};
use zenoh_protocol::{
core::{EndPoint, Locator},
Expand Down Expand Up @@ -354,10 +354,14 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp {
)?;

let token = self.listeners.token.child_token();
let c_token = token.clone();

let c_manager = self.manager.clone();
let task = async move { accept_task(socket, c_token, c_manager).await };
let task = {
let token = token.clone();
let manager = self.manager.clone();
let endpoint = endpoint.clone();

async move { accept_task(endpoint, socket, token, manager).await }
};

let locator = endpoint.to_locator();
self.listeners
Expand Down Expand Up @@ -421,6 +425,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp {
}

async fn accept_task(
endpoint: EndPoint,
socket: TcpListener,
token: CancellationToken,
manager: NewLinkChannelSender,
Expand Down Expand Up @@ -457,7 +462,7 @@ async fn accept_task(
let link = Arc::new(LinkUnicastTcp::new(stream, src_addr, dst_addr));

// Communicate the new link to the initial transport manager
if let Err(e) = manager.send_async(LinkUnicast(link)).await {
if let Err(e) = manager.send_async(NewLinkUnicast{ link: LinkUnicast(link), endpoint: endpoint.clone() }).await {
tracing::error!("{}-{}: {}", file!(), line!(), e)
}
},
Expand Down
17 changes: 11 additions & 6 deletions io/zenoh-links/zenoh-link-tls/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use x509_parser::prelude::*;
use zenoh_core::zasynclock;
use zenoh_link_commons::{
get_ip_interface_names, LinkAuthId, LinkAuthType, LinkManagerUnicastTrait, LinkUnicast,
LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender,
LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, NewLinkUnicast,
};
use zenoh_protocol::{
core::{EndPoint, Locator},
Expand Down Expand Up @@ -362,12 +362,16 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls {
let local_port = local_addr.port();

// Initialize the TlsAcceptor
let acceptor = TlsAcceptor::from(Arc::new(tls_server_config.server_config));
let token = self.listeners.token.child_token();
let c_token = token.clone();
let c_manager = self.manager.clone();

let task = async move { accept_task(socket, acceptor, c_token, c_manager).await };
let task = {
let acceptor = TlsAcceptor::from(Arc::new(tls_server_config.server_config));
let token = token.clone();
let manager = self.manager.clone();
let endpoint = endpoint.clone();

async move { accept_task(endpoint, socket, acceptor, token, manager).await }
};

// Update the endpoint locator address
let locator = Locator::new(
Expand Down Expand Up @@ -399,6 +403,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls {
}

async fn accept_task(
endpoint: EndPoint,
socket: TcpListener,
acceptor: TlsAcceptor,
token: CancellationToken,
Expand Down Expand Up @@ -456,7 +461,7 @@ async fn accept_task(
));

// Communicate the new link to the initial transport manager
if let Err(e) = manager.send_async(LinkUnicast(link)).await {
if let Err(e) = manager.send_async(NewLinkUnicast {link: LinkUnicast(link), endpoint: endpoint.clone()}).await {
tracing::error!("{}-{}: {}", file!(), line!(), e)
}
}
Expand Down
Loading

0 comments on commit 90b068f

Please sign in to comment.