Skip to content

Commit

Permalink
Set socket prioriy on unicast
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry73204 committed Jan 12, 2024
1 parent cdaced7 commit 7b401ae
Show file tree
Hide file tree
Showing 14 changed files with 103 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ libc = "0.2.139"
libloading = "0.8"
log = "0.4.17"
lz4_flex = "0.11"
nix = { version = "0.27", features = ["fs"] }
nix = { version = "0.27", features = ["fs", "net", "socket"] }
num_cpus = "1.15.0"
ordered-float = "4.1.1"
panic-message = "0.3.0"
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-link-commons/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use core::{
hash::{Hash, Hasher},
ops::Deref,
};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_protocol::core::{EndPoint, Locator, Priority};
use zenoh_result::ZResult;

pub type LinkManagerUnicast = Arc<dyn LinkManagerUnicastTrait>;
Expand All @@ -45,6 +45,7 @@ pub trait LinkUnicastTrait: Send + Sync {
fn get_dst(&self) -> &Locator;
fn is_reliable(&self) -> bool;
fn is_streamed(&self) -> bool;
fn set_priority(&self, priority: Priority) -> ZResult<()>;
async fn write(&self, buffer: &[u8]) -> ZResult<usize>;
async fn write_all(&self, buffer: &[u8]) -> ZResult<()>;
async fn read(&self, buffer: &mut [u8]) -> ZResult<usize>;
Expand Down
6 changes: 6 additions & 0 deletions io/zenoh-links/zenoh-link-quic/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use std::net::IpAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use zenoh_config::Priority;
use zenoh_core::{zasynclock, zread, zwrite};
use zenoh_link_commons::{
LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender,
Expand Down Expand Up @@ -152,6 +153,11 @@ impl LinkUnicastTrait for LinkUnicastQuic {
fn is_streamed(&self) -> bool {
true
}

fn set_priority(&self, _priority: Priority) -> ZResult<()> {
// no-op
Ok(())
}
}

impl Drop for LinkUnicastQuic {
Expand Down
6 changes: 6 additions & 0 deletions io/zenoh-links/zenoh-link-serial/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use zenoh_config::Priority;
use zenoh_core::{zasynclock, zread, zwrite};
use zenoh_link_commons::{
ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait,
Expand Down Expand Up @@ -190,6 +191,11 @@ impl LinkUnicastTrait for LinkUnicastSerial {
fn is_streamed(&self) -> bool {
false
}

fn set_priority(&self, _priority: Priority) -> ZResult<()> {
// no-op
Ok(())
}
}

impl fmt::Display for LinkUnicastSerial {
Expand Down
1 change: 1 addition & 0 deletions io/zenoh-links/zenoh-link-tcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ description = "Internal crate for zenoh."
async-std = { workspace = true }
async-trait = { workspace = true }
log = { workspace = true }
nix.workspace = true
zenoh-core = { workspace = true }
zenoh-link-commons = { workspace = true }
zenoh-protocol = { workspace = true }
Expand Down
11 changes: 10 additions & 1 deletion io/zenoh-links/zenoh-link-tcp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ use std::collections::HashMap;
use std::convert::TryInto;
use std::fmt;
use std::net::{IpAddr, Shutdown};
use std::os::fd::{AsRawFd, BorrowedFd};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use zenoh_core::{zread, zwrite};
use zenoh_link_commons::{
LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender,
};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_protocol::core::{EndPoint, Locator, Priority};
use zenoh_result::{bail, zerror, Error as ZError, ZResult};
use zenoh_sync::Signal;

Expand Down Expand Up @@ -153,6 +154,14 @@ impl LinkUnicastTrait for LinkUnicastTcp {
fn is_streamed(&self) -> bool {
true
}

fn set_priority(&self, priority: Priority) -> ZResult<()> {
use nix::sys::socket::sockopt::Priority as O_PRIORITY;
let fd = unsafe { BorrowedFd::borrow_raw(self.socket.as_raw_fd()) };
let priority = priority as u8 as i32;
nix::sys::socket::setsockopt(&fd, O_PRIORITY, &priority)?;
Ok(())
}
}

impl Drop for LinkUnicastTcp {
Expand Down
6 changes: 6 additions & 0 deletions io/zenoh-links/zenoh-link-tls/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use webpki::{
anchor_from_trusted_cert,
types::{CertificateDer, TrustAnchor},
};
use zenoh_config::Priority;
use zenoh_core::{zasynclock, zread, zwrite};
use zenoh_link_commons::{
LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender,
Expand Down Expand Up @@ -204,6 +205,11 @@ impl LinkUnicastTrait for LinkUnicastTls {
fn is_streamed(&self) -> bool {
true
}

fn set_priority(&self, _priority: Priority) -> ZResult<()> {
// no-op
Ok(())
}
}

impl Drop for LinkUnicastTls {
Expand Down
1 change: 1 addition & 0 deletions io/zenoh-links/zenoh-link-udp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ description = "Internal crate for zenoh."
async-std = { workspace = true }
async-trait = { workspace = true }
log = { workspace = true }
nix.workspace = true
socket2 = { workspace = true }
zenoh-buffers = { workspace = true }
zenoh-collections = { workspace = true }
Expand Down
31 changes: 30 additions & 1 deletion io/zenoh-links/zenoh-link-udp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use async_trait::async_trait;
use std::collections::HashMap;
use std::fmt;
use std::net::IpAddr;
use std::os::fd::{AsRawFd, BorrowedFd};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::Duration;
Expand All @@ -32,7 +33,7 @@ use zenoh_link_commons::{
ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait,
NewLinkChannelSender,
};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_protocol::core::{EndPoint, Locator, Priority};
use zenoh_result::{bail, zerror, Error as ZError, ZResult};
use zenoh_sync::Mvar;
use zenoh_sync::Signal;
Expand Down Expand Up @@ -63,6 +64,14 @@ impl LinkUnicastUdpConnected {
async fn close(&self) -> ZResult<()> {
Ok(())
}

fn set_priority(&self, priority: Priority) -> ZResult<()> {
use nix::sys::socket::sockopt::Priority as O_PRIORITY;
let fd = unsafe { BorrowedFd::borrow_raw(self.socket.as_raw_fd()) };
let priority = priority as u8 as i32;
nix::sys::socket::setsockopt(&fd, O_PRIORITY, &priority)?;
Ok(())
}
}

struct LinkUnicastUdpUnconnected {
Expand Down Expand Up @@ -116,6 +125,19 @@ impl LinkUnicastUdpUnconnected {
zlock!(self.links).remove(&(src_addr, dst_addr));
Ok(())
}

fn set_priority(&self, priority: Priority) -> ZResult<()> {
use nix::sys::socket::sockopt::Priority as O_PRIORITY;

let socket = match self.socket.upgrade() {
Some(socket) => socket,
None => bail!("UDP listener has been dropped"),
};
let fd = unsafe { BorrowedFd::borrow_raw(socket.as_raw_fd()) };
let priority = priority as u8 as i32;
nix::sys::socket::setsockopt(&fd, O_PRIORITY, &priority)?;
Ok(())
}
}

enum LinkUnicastUdpVariant {
Expand Down Expand Up @@ -217,6 +239,13 @@ impl LinkUnicastTrait for LinkUnicastUdp {
fn is_streamed(&self) -> bool {
false
}

fn set_priority(&self, priority: Priority) -> ZResult<()> {
match &self.variant {
LinkUnicastUdpVariant::Connected(link) => link.set_priority(priority),
LinkUnicastUdpVariant::Unconnected(link) => link.set_priority(priority),
}
}
}

impl fmt::Display for LinkUnicastUdp {
Expand Down
6 changes: 6 additions & 0 deletions io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::fs::{File, OpenOptions};
use std::io::{Read, Write};
use std::os::unix::fs::OpenOptionsExt;
use std::sync::Arc;
use zenoh_config::Priority;
use zenoh_core::{zasyncread, zasyncwrite};
use zenoh_protocol::core::{EndPoint, Locator};

Expand Down Expand Up @@ -501,6 +502,11 @@ impl LinkUnicastTrait for UnicastPipe {
fn is_streamed(&self) -> bool {
true
}

fn set_priority(&self, _priority: Priority) -> ZResult<()> {
// TODO: no-op
Ok(())
}
}

impl fmt::Display for UnicastPipe {
Expand Down
11 changes: 10 additions & 1 deletion io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::collections::HashMap;
use std::fmt;
use std::fs::remove_file;
use std::net::Shutdown;
use std::os::fd::{AsRawFd, BorrowedFd};
use std::os::unix::io::RawFd;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
Expand All @@ -33,7 +34,7 @@ use zenoh_core::{zread, zwrite};
use zenoh_link_commons::{
LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender,
};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_protocol::core::{EndPoint, Locator, Priority};
use zenoh_result::{zerror, ZResult};
use zenoh_sync::Signal;

Expand Down Expand Up @@ -124,6 +125,14 @@ impl LinkUnicastTrait for LinkUnicastUnixSocketStream {
fn is_streamed(&self) -> bool {
true
}

fn set_priority(&self, priority: Priority) -> ZResult<()> {
use nix::sys::socket::sockopt::Priority as O_PRIORITY;
let fd = unsafe { BorrowedFd::borrow_raw(self.socket.as_raw_fd()) };
let priority = priority as u8 as i32;
nix::sys::socket::setsockopt(&fd, O_PRIORITY, &priority)?;
Ok(())
}
}

impl Drop for LinkUnicastUnixSocketStream {
Expand Down
6 changes: 6 additions & 0 deletions io/zenoh-links/zenoh-link-ws/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use zenoh_core::{zasynclock, zread, zwrite};
use zenoh_link_commons::{
LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender,
};
use zenoh_protocol::core::Priority;
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_result::{bail, zerror, ZResult};
use zenoh_sync::Signal;
Expand Down Expand Up @@ -216,6 +217,11 @@ impl LinkUnicastTrait for LinkUnicastWs {
fn is_streamed(&self) -> bool {
false
}

fn set_priority(&self, _priority: Priority) -> ZResult<()> {
// no-op
Ok(())
}
}

impl Drop for LinkUnicastWs {
Expand Down
13 changes: 11 additions & 2 deletions io/zenoh-transport/src/unicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::common::batch::{BatchConfig, Decode, Encode, Finalize, RBatch, WBatch
use std::fmt;
use std::sync::Arc;
use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer};
use zenoh_config::Priority;
use zenoh_core::zcondfeat;
use zenoh_link::{Link, LinkUnicast};
use zenoh_protocol::transport::{BatchSize, Close, OpenAck, TransportMessage};
Expand Down Expand Up @@ -144,7 +145,11 @@ pub(crate) struct TransportLinkUnicastTx {
}

impl TransportLinkUnicastTx {
pub(crate) async fn send_batch(&mut self, batch: &mut WBatch) -> ZResult<()> {
pub(crate) async fn send_batch(
&mut self,
batch: &mut WBatch,
priority: Option<Priority>,
) -> ZResult<()> {
const ERR: &str = "Write error on link: ";

// log::trace!("WBatch: {:?}", batch);
Expand All @@ -165,6 +170,10 @@ impl TransportLinkUnicastTx {
// log::trace!("WBytes: {:02x?}", bytes);

// Send the message on the link

if let Some(priority) = priority {
self.inner.link.set_priority(priority)?;
}
self.inner.link.write_all(bytes).await?;

Ok(())
Expand All @@ -177,7 +186,7 @@ impl TransportLinkUnicastTx {
let mut batch = WBatch::new(self.inner.config.batch);
batch.encode(msg).map_err(|_| zerror!("{ERR}{self}"))?;
let len = batch.len() as usize;
self.send_batch(&mut batch).await?;
self.send_batch(&mut batch, None).await?;
Ok(len)
}
}
Expand Down
9 changes: 7 additions & 2 deletions io/zenoh-transport/src/unicast/universal/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use std::{
time::Duration,
};
use zenoh_buffers::ZSliceBuffer;
use zenoh_config::Priority;
use zenoh_core::zwrite;
use zenoh_protocol::transport::{KeepAlive, TransportMessage};
use zenoh_result::{zerror, ZResult};
Expand Down Expand Up @@ -195,7 +196,11 @@ async fn tx_task(
match pipeline.pull().timeout(keep_alive).await {
Ok(res) => match res {
Some((mut batch, priority)) => {
link.send_batch(&mut batch).await?;
link.send_batch(
&mut batch,
Some(Priority::try_from(priority as u8).unwrap()),
)
.await?;

#[cfg(feature = "stats")]
{
Expand Down Expand Up @@ -225,7 +230,7 @@ async fn tx_task(
// Drain the transmission pipeline and write remaining bytes on the wire
let mut batches = pipeline.drain();
for (mut b, _) in batches.drain(..) {
link.send_batch(&mut b)
link.send_batch(&mut b, None)
.timeout(keep_alive)
.await
.map_err(|_| zerror!("{}: flush failed after {} ms", link, keep_alive.as_millis()))??;
Expand Down

0 comments on commit 7b401ae

Please sign in to comment.