Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Select links based on message Reliability #1356

Closed
wants to merge 13 commits into from
14 changes: 12 additions & 2 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,17 @@
/// If not configured, all the supported protocols are automatically whitelisted.
/// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"]
/// For example, to only enable "tls" and "quic":
// protocols: ["tls", "quic"],
/// protocols: ["tls", "quic"],
///
/// ## Endpoint metadata
///
/// **priorities**: a Rust-style half-open range (e.g. `2..4` signifies priorities 2 and 3).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The range should be inclusive. It doesn't make sense to express priorities=1..9 to set any priority from 1 to 8 given that priority 9 does not exist.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

/// This value is used to select the link used for transmission based on the Priority of the
/// message in question.
///
/// **reliability**: either "best_effort" or "reliable". This value is used to select the link
/// used for transmission based on the Reliability of the message in question.
///
/// Configure the zenoh TX parameters of a link
tx: {
/// The resolution in bits to be used for the message sequence numbers.
Expand Down Expand Up @@ -387,7 +397,7 @@
enabled: true,
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
time_limit: 1,
}
},
},
},
/// Configure the zenoh RX parameters of a link
Expand Down
118 changes: 115 additions & 3 deletions commons/zenoh-protocol/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ use alloc::{
};
use core::{
convert::{From, TryFrom, TryInto},
fmt,
fmt::{self, Display},
hash::Hash,
str::FromStr,
};
use std::error::Error;

use serde::Serialize;
pub use uhlc::{Timestamp, NTP64};
use zenoh_keyexpr::OwnedKeyExpr;
use zenoh_result::{bail, zerror};
use zenoh_result::{bail, zerror, ZResult};

/// The unique Id of the [`HLC`](uhlc::HLC) that generated the concerned [`Timestamp`].
pub type TimestampId = uhlc::ID;
Expand Down Expand Up @@ -308,6 +310,58 @@ pub enum Priority {
Background = 7,
}

#[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq, Serialize)]
pub struct PriorityRange {
pub start: u8,
pub end: u8,
}

impl PriorityRange {
pub fn new(start: u8, end: u8) -> ZResult<Self> {
if start >= end || start < Priority::MAX as u8 || end > Priority::MIN as u8 + 1 {
bail!("Invalid priority range: {start}..{end}")
};

Ok(Self { start, end })
}

/// Returns `true` if `priority` is a member of `self`.
pub fn contains(&self, priority: Priority) -> bool {
self.start <= (priority as u8) && (priority as u8) < self.end
}

/// Returns `true` if `self` is a superset of `other`.
pub fn includes(&self, other: PriorityRange) -> bool {
self.start <= other.start && other.end <= self.end
}

pub fn len(&self) -> usize {
(self.end - self.start) as usize
}

pub fn is_empty(&self) -> bool {
self.end == self.start
}

pub fn start(&self) -> u8 {
self.start
}

pub fn end(&self) -> u8 {
self.end
}

#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::Rng;
let mut rng = rand::thread_rng();
let start = rng.gen_range(Priority::MAX as u8..Priority::MIN as u8);
let end = rng.gen_range((start + 1)..=Priority::MIN as u8);

Self { start, end }
}
}

impl Priority {
/// Default
pub const DEFAULT: Self = Self::Data;
Expand Down Expand Up @@ -342,7 +396,7 @@ impl TryFrom<u8> for Priority {
}
}

#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize)]
#[repr(u8)]
pub enum Reliability {
#[default]
Expand All @@ -353,6 +407,14 @@ pub enum Reliability {
impl Reliability {
pub const DEFAULT: Self = Self::BestEffort;

/// Returns `true` is `self` implies `other`.
pub fn implies(self, other: Self) -> bool {
!matches!(
(self, other),
(Reliability::Reliable, Reliability::BestEffort)
)
}

#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::Rng;
Expand All @@ -367,6 +429,56 @@ impl Reliability {
}
}

impl From<bool> for Reliability {
fn from(value: bool) -> Self {
if value {
Reliability::Reliable
} else {
Reliability::BestEffort
}
}
}

impl From<Reliability> for bool {
fn from(value: Reliability) -> Self {
match value {
Reliability::BestEffort => false,
Reliability::Reliable => true,
}
}
}

#[derive(Debug)]
pub struct InvalidReliability {
found: String,
}

impl Display for InvalidReliability {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"invalid Reliability string, expected `best_effort` or `reliable` but found {}",
self.found
)
}
}

impl Error for InvalidReliability {}

impl FromStr for Reliability {
type Err = InvalidReliability;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"reliable" => Ok(Reliability::Reliable),
"best_effort" => Ok(Reliability::BestEffort),
other => Err(InvalidReliability {
found: other.to_string(),
}),
}
}
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
pub struct Channel {
pub priority: Priority,
Expand Down
14 changes: 7 additions & 7 deletions commons/zenoh-protocol/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ pub struct InitSyn {
// Extensions
pub mod ext {
use crate::{
common::{ZExtUnit, ZExtZBuf},
zextunit, zextzbuf,
common::{ZExtUnit, ZExtZ64, ZExtZBuf},
zextunit, zextz64, zextzbuf,
};

/// # QoS extension
/// Used to negotiate the use of QoS
pub type QoS = zextunit!(0x1, false);
pub type QoS = zextz64!(0x1, false);

/// # Shm extension
/// Used as challenge for probing shared memory capabilities
Expand Down Expand Up @@ -161,7 +161,7 @@ impl InitSyn {
pub fn rand() -> Self {
use rand::Rng;

use crate::common::{ZExtUnit, ZExtZBuf};
use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf};

let mut rng = rand::thread_rng();

Expand All @@ -170,7 +170,7 @@ impl InitSyn {
let zid = ZenohIdProto::default();
let resolution = Resolution::rand();
let batch_size: BatchSize = rng.gen();
let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand());
#[cfg(feature = "shared-memory")]
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
Expand Down Expand Up @@ -217,7 +217,7 @@ impl InitAck {
pub fn rand() -> Self {
use rand::Rng;

use crate::common::{ZExtUnit, ZExtZBuf};
use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf};

let mut rng = rand::thread_rng();

Expand All @@ -231,7 +231,7 @@ impl InitAck {
};
let batch_size: BatchSize = rng.gen();
let cookie = ZSlice::rand(64);
let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand());
#[cfg(feature = "shared-memory")]
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
Expand Down
38 changes: 17 additions & 21 deletions io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ pub use listener::*;
pub use multicast::*;
use serde::Serialize;
pub use unicast::*;
use zenoh_protocol::{core::Locator, transport::BatchSize};
use zenoh_protocol::{
core::{Locator, PriorityRange, Reliability},
transport::BatchSize,
};
use zenoh_result::ZResult;

/*************************************/
Expand All @@ -48,10 +51,11 @@ pub struct Link {
pub dst: Locator,
pub group: Option<Locator>,
pub mtu: BatchSize,
pub is_reliable: bool,
pub is_streamed: bool,
pub interfaces: Vec<String>,
pub auth_identifier: LinkAuthId,
pub priorities: Option<PriorityRange>,
pub reliability: Reliability,
}

#[async_trait]
Expand All @@ -70,48 +74,40 @@ impl fmt::Display for Link {
}
}

impl From<&LinkUnicast> for Link {
fn from(link: &LinkUnicast) -> Link {
impl Link {
pub fn new_unicast(
link: &LinkUnicast,
priorities: Option<PriorityRange>,
reliability: Reliability,
) -> Self {
Link {
src: link.get_src().to_owned(),
dst: link.get_dst().to_owned(),
group: None,
mtu: link.get_mtu(),
is_reliable: link.is_reliable(),
is_streamed: link.is_streamed(),
interfaces: link.get_interface_names(),
auth_identifier: link.get_auth_id().clone(),
priorities,
reliability,
}
}
}

impl From<LinkUnicast> for Link {
fn from(link: LinkUnicast) -> Link {
Link::from(&link)
}
}

impl From<&LinkMulticast> for Link {
fn from(link: &LinkMulticast) -> Link {
pub fn new_multicast(link: &LinkMulticast) -> Self {
Link {
src: link.get_src().to_owned(),
dst: link.get_dst().to_owned(),
group: Some(link.get_dst().to_owned()),
mtu: link.get_mtu(),
is_reliable: link.is_reliable(),
is_streamed: false,
interfaces: vec![],
auth_identifier: LinkAuthId::default(),
priorities: None,
reliability: Reliability::from(link.is_reliable()),
}
}
}

impl From<LinkMulticast> for Link {
fn from(link: LinkMulticast) -> Link {
Link::from(&link)
}
}

impl PartialEq<LinkUnicast> for Link {
fn eq(&self, other: &LinkUnicast) -> bool {
self.src == *other.get_src() && self.dst == *other.get_dst()
Expand Down
13 changes: 12 additions & 1 deletion io/zenoh-link-commons/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,18 @@ pub trait LinkManagerUnicastTrait: Send + Sync {
async fn get_listeners(&self) -> Vec<EndPoint>;
async fn get_locators(&self) -> Vec<Locator>;
}
pub type NewLinkChannelSender = flume::Sender<LinkUnicast>;
pub type NewLinkChannelSender = flume::Sender<NewLinkUnicast>;

/// Notification of a new inbound connection.
///
/// Link implementations should preserve the metadata sections of [`NewLinkUnicast::endpoint`].
pub struct NewLinkUnicast {
/// The link created in response to a new inbound connection.
pub link: LinkUnicast,
/// Endpoint of the listener.
pub endpoint: EndPoint,
}

pub trait ConstructibleLinkManagerUnicast<T>: Sized {
fn new(new_link_sender: NewLinkChannelSender, config: T) -> ZResult<Self>;
}
Expand Down
Loading
Loading