diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2b845360d9..5e24616f6e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -67,7 +67,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: clippy - args: --all-targets --features shared-memory --features transport_shm -- -D warnings + args: --all-targets --features shared-memory --features transport_unixpipe -- -D warnings test: name: Run tests on ${{ matrix.os }} @@ -107,7 +107,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: nextest - args: run -F shared-memory -F transport_shm -p zenoh-transport + args: run -F shared-memory -F transport_unixpipe -p zenoh-transport env: CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse ASYNC_STD_THREAD_COUNT: 4 diff --git a/Cargo.lock b/Cargo.lock index da0115d63c..e945a9e637 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4642,10 +4642,10 @@ dependencies = [ "zenoh-link-commons", "zenoh-link-quic", "zenoh-link-serial", - "zenoh-link-shm", "zenoh-link-tcp", "zenoh-link-tls", "zenoh-link-udp", + "zenoh-link-unixpipe", "zenoh-link-unixsock_stream", "zenoh-link-ws", "zenoh-protocol", @@ -4711,27 +4711,6 @@ dependencies = [ "zenoh-util", ] -[[package]] -name = "zenoh-link-shm" -version = "0.10.0-dev" -dependencies = [ - "advisory-lock", - "async-io", - "async-std", - "async-trait", - "filepath", - "log", - "nix 0.27.1", - "rand 0.8.5", - "unix-named-pipe", - "zenoh-buffers", - "zenoh-config", - "zenoh-core", - "zenoh-link-commons", - "zenoh-protocol", - "zenoh-result", -] - [[package]] name = "zenoh-link-tcp" version = "0.10.0-dev" @@ -4787,6 +4766,27 @@ dependencies = [ "zenoh-util", ] +[[package]] +name = "zenoh-link-unixpipe" +version = "0.10.0-dev" +dependencies = [ + "advisory-lock", + "async-io", + "async-std", + "async-trait", + "filepath", + "log", + "nix 0.27.1", + "rand 0.8.5", + "unix-named-pipe", + "zenoh-buffers", + "zenoh-config", + "zenoh-core", + "zenoh-link-commons", + "zenoh-protocol", + "zenoh-result", +] + [[package]] name = "zenoh-link-unixsock_stream" version = "0.10.0-dev" diff --git a/Cargo.toml b/Cargo.toml index 08d544d2dd..bbbfef1703 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ members = [ "io/zenoh-links/zenoh-link-udp/", "io/zenoh-links/zenoh-link-unixsock_stream/", "io/zenoh-links/zenoh-link-ws/", - "io/zenoh-links/zenoh-link-shm/", + "io/zenoh-links/zenoh-link-unixpipe/", "io/zenoh-transport", "plugins/example-plugin", "plugins/zenoh-backend-traits", @@ -180,7 +180,7 @@ zenoh-link-unixsock_stream = { version = "0.10.0-dev", path = "io/zenoh-links/ze zenoh-link-quic = { version = "0.10.0-dev", path = "io/zenoh-links/zenoh-link-quic" } zenoh-link-udp = { version = "0.10.0-dev", path = "io/zenoh-links/zenoh-link-udp" } zenoh-link-ws = { version = "0.10.0-dev", path = "io/zenoh-links/zenoh-link-ws" } -zenoh-link-shm = { version = "0.10.0-dev", path = "io/zenoh-links/zenoh-link-shm" } +zenoh-link-unixpipe = { version = "0.10.0-dev", path = "io/zenoh-links/zenoh-link-unixpipe" } zenoh-link-serial = { version = "0.10.0-dev", path = "io/zenoh-links/zenoh-link-serial" } zenoh-link = { version = "0.10.0-dev", path = "io/zenoh-link" } zenoh-link-commons = { version = "0.10.0-dev", path = "io/zenoh-link-commons" } diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 6728494bb1..dae3ebc9aa 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -125,6 +125,14 @@ max_sessions: 1000, /// Maximum number of incoming links that are admitted per session max_links: 1, + /// Enables the LowLatency transport + /// This option does not make LowLatency transport mandatory, the actual implementation of transport + /// used will depend on Establish procedure and other party's settings + /// + /// NOTE: Currently, the LowLatency transport doesn't preserve QoS prioritization. + /// NOTE: Due to the note above, 'lowlatency' is incompatible with 'qos' option, so in order to + /// enable 'lowlatency' you need to explicitly disable 'qos'. + lowlatency: false, }, qos: { enabled: true, diff --git a/commons/zenoh-codec/src/transport/init.rs b/commons/zenoh-codec/src/transport/init.rs index 2f5dae25c9..db37c8fc03 100644 --- a/commons/zenoh-codec/src/transport/init.rs +++ b/commons/zenoh-codec/src/transport/init.rs @@ -45,7 +45,8 @@ where let mut n_exts = (x.ext_qos.is_some() as u8) + (x.ext_shm.is_some() as u8) + (x.ext_auth.is_some() as u8) - + (x.ext_mlink.is_some() as u8); + + (x.ext_mlink.is_some() as u8) + + (x.ext_lowlatency.is_some() as u8); if n_exts != 0 { header |= flag::Z; } @@ -87,6 +88,10 @@ where n_exts -= 1; self.write(&mut *writer, (mlink, n_exts != 0))?; } + if let Some(lowlatency) = x.ext_lowlatency.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (lowlatency, n_exts != 0))?; + } Ok(()) } @@ -144,6 +149,7 @@ where let mut ext_shm = None; let mut ext_auth = None; let mut ext_mlink = None; + let mut ext_lowlatency = None; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -170,6 +176,11 @@ where ext_mlink = Some(a); has_ext = ext; } + ext::LowLatency::ID => { + let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?; + ext_lowlatency = Some(q); + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "InitSyn", ext)?; } @@ -186,6 +197,7 @@ where ext_shm, ext_auth, ext_mlink, + ext_lowlatency, }) } } @@ -206,7 +218,8 @@ where let mut n_exts = (x.ext_qos.is_some() as u8) + (x.ext_shm.is_some() as u8) + (x.ext_auth.is_some() as u8) - + (x.ext_mlink.is_some() as u8); + + (x.ext_mlink.is_some() as u8) + + (x.ext_lowlatency.is_some() as u8); if n_exts != 0 { header |= flag::Z; } @@ -251,6 +264,10 @@ where n_exts -= 1; self.write(&mut *writer, (mlink, n_exts != 0))?; } + if let Some(lowlatency) = x.ext_lowlatency.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (lowlatency, n_exts != 0))?; + } Ok(()) } @@ -311,6 +328,7 @@ where let mut ext_shm = None; let mut ext_auth = None; let mut ext_mlink = None; + let mut ext_lowlatency = None; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -337,6 +355,11 @@ where ext_mlink = Some(a); has_ext = ext; } + ext::LowLatency::ID => { + let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?; + ext_lowlatency = Some(q); + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "InitAck", ext)?; } @@ -354,6 +377,7 @@ where ext_shm, ext_auth, ext_mlink, + ext_lowlatency, }) } } diff --git a/commons/zenoh-codec/src/transport/mod.rs b/commons/zenoh-codec/src/transport/mod.rs index 8cb74d7d77..3aa6423eb6 100644 --- a/commons/zenoh-codec/src/transport/mod.rs +++ b/commons/zenoh-codec/src/transport/mod.rs @@ -25,50 +25,48 @@ use zenoh_buffers::{ reader::{BacktrackableReader, DidntRead, Reader}, writer::{DidntWrite, Writer}, }; -#[cfg(feature = "shared-memory")] -use zenoh_protocol::network::NetworkMessage; use zenoh_protocol::{ common::{imsg, ZExtZ64}, + network::NetworkMessage, transport::*, }; -// TransportMessageShm -#[cfg(feature = "shared-memory")] -impl WCodec<&TransportMessageShm, &mut W> for Zenoh080 +// TransportMessageLowLatency +impl WCodec<&TransportMessageLowLatency, &mut W> for Zenoh080 where W: Writer, { type Output = Result<(), DidntWrite>; - fn write(self, writer: &mut W, x: &TransportMessageShm) -> Self::Output { + fn write(self, writer: &mut W, x: &TransportMessageLowLatency) -> Self::Output { match &x.body { - TransportBodyShm::Network(b) => self.write(&mut *writer, b.as_ref()), - TransportBodyShm::KeepAlive(b) => self.write(&mut *writer, b), - TransportBodyShm::Close(b) => self.write(&mut *writer, b), + TransportBodyLowLatency::Network(b) => self.write(&mut *writer, b), + TransportBodyLowLatency::KeepAlive(b) => self.write(&mut *writer, b), + TransportBodyLowLatency::Close(b) => self.write(&mut *writer, b), } } } -#[cfg(feature = "shared-memory")] -impl RCodec for Zenoh080 + +impl RCodec for Zenoh080 where R: Reader + BacktrackableReader, { type Error = DidntRead; - fn read(self, reader: &mut R) -> Result { + fn read(self, reader: &mut R) -> Result { let header: u8 = self.read(&mut *reader)?; let codec = Zenoh080Header::new(header); let body = match imsg::mid(codec.header) { - id::KEEP_ALIVE => TransportBodyShm::KeepAlive(codec.read(&mut *reader)?), - id::CLOSE => TransportBodyShm::Close(codec.read(&mut *reader)?), + id::KEEP_ALIVE => TransportBodyLowLatency::KeepAlive(codec.read(&mut *reader)?), + id::CLOSE => TransportBodyLowLatency::Close(codec.read(&mut *reader)?), _ => { let nw: NetworkMessage = codec.read(&mut *reader)?; - TransportBodyShm::Network(Box::new(nw)) + TransportBodyLowLatency::Network(nw) } }; - Ok(TransportMessageShm { body }) + Ok(TransportMessageLowLatency { body }) } } diff --git a/commons/zenoh-codec/src/transport/open.rs b/commons/zenoh-codec/src/transport/open.rs index 62f0c65d1f..bbcb43de98 100644 --- a/commons/zenoh-codec/src/transport/open.rs +++ b/commons/zenoh-codec/src/transport/open.rs @@ -43,7 +43,8 @@ where let mut n_exts = (x.ext_qos.is_some() as u8) + (x.ext_shm.is_some() as u8) + (x.ext_auth.is_some() as u8) - + (x.ext_mlink.is_some() as u8); + + (x.ext_mlink.is_some() as u8) + + (x.ext_lowlatency.is_some() as u8); if n_exts != 0 { header |= flag::Z; } @@ -75,6 +76,10 @@ where n_exts -= 1; self.write(&mut *writer, (mlink, n_exts != 0))?; } + if let Some(lowlatency) = x.ext_lowlatency.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (lowlatency, n_exts != 0))?; + } Ok(()) } @@ -119,6 +124,7 @@ where let mut ext_shm = None; let mut ext_auth = None; let mut ext_mlink = None; + let mut ext_lowlatency = None; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -145,6 +151,11 @@ where ext_mlink = Some(a); has_ext = ext; } + ext::LowLatency::ID => { + let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?; + ext_lowlatency = Some(q); + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "OpenSyn", ext)?; } @@ -159,6 +170,7 @@ where ext_shm, ext_auth, ext_mlink, + ext_lowlatency, }) } } @@ -181,7 +193,8 @@ where let mut n_exts = (x.ext_qos.is_some() as u8) + (x.ext_shm.is_some() as u8) + (x.ext_auth.is_some() as u8) - + (x.ext_mlink.is_some() as u8); + + (x.ext_mlink.is_some() as u8) + + (x.ext_lowlatency.is_some() as u8); if n_exts != 0 { header |= flag::Z; } @@ -212,6 +225,10 @@ where n_exts -= 1; self.write(&mut *writer, (mlink, n_exts != 0))?; } + if let Some(lowlatency) = x.ext_lowlatency.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (lowlatency, n_exts != 0))?; + } Ok(()) } @@ -255,6 +272,7 @@ where let mut ext_shm = None; let mut ext_auth = None; let mut ext_mlink = None; + let mut ext_lowlatency = None; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { @@ -281,6 +299,11 @@ where ext_mlink = Some(a); has_ext = ext; } + ext::LowLatency::ID => { + let (q, ext): (ext::LowLatency, bool) = eodec.read(&mut *reader)?; + ext_lowlatency = Some(q); + has_ext = ext; + } _ => { has_ext = extension::skip(reader, "OpenAck", ext)?; } @@ -294,6 +317,7 @@ where ext_shm, ext_auth, ext_mlink, + ext_lowlatency, }) } } diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index ea6714eb68..5b4d3da835 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -106,6 +106,7 @@ impl Default for TransportUnicastConf { accept_pending: 100, max_sessions: 1_000, max_links: 1, + lowlatency: false, } } } diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index f3943c1778..b0857f2caf 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -219,6 +219,10 @@ validated_struct::validator! { max_sessions: usize, /// Maximum number of unicast incoming links per transport session (default: 1) max_links: usize, + /// Enables the LowLatency transport (default `false`). + /// This option does not make LowLatency transport mandatory, the actual implementation of transport + /// used will depend on Establish procedure and other party's settings + lowlatency: bool, }, pub multicast: TransportMulticastConf { /// Link join interval duration in milliseconds (default: 2500) @@ -291,9 +295,9 @@ validated_struct::validator! { client_certificate: Option, server_name_verification: Option }, - pub shared_memory: #[derive(Default)] - SHMConf { - shm_access_mask: Option + pub unixpipe: #[derive(Default)] + UnixPipeConf { + file_access_mask: Option }, pub compression: #[derive(Default)] /// **Experimental** compression feature. @@ -311,7 +315,8 @@ validated_struct::validator! { pub shared_memory: SharedMemoryConf { /// Whether shared memory is enabled or not. - /// If set to `true`, the shared-memory transport will be enabled. (default `false`). + /// If set to `true`, the SHM buffer optimization support will be announced to other parties. (default `false`). + /// This option doesn't make SHM buffer optimization mandatory, the real support depends on other party setting enabled: bool, }, pub auth: #[derive(Default)] diff --git a/commons/zenoh-protocol/src/transport/init.rs b/commons/zenoh-protocol/src/transport/init.rs index 566b6c91b1..d553799fd1 100644 --- a/commons/zenoh-protocol/src/transport/init.rs +++ b/commons/zenoh-protocol/src/transport/init.rs @@ -117,6 +117,7 @@ pub struct InitSyn { pub ext_shm: Option, pub ext_auth: Option, pub ext_mlink: Option, + pub ext_lowlatency: Option, } // Extensions @@ -141,6 +142,10 @@ pub mod ext { /// # Multilink extension /// Used as challenge for probing multilink capabilities pub type MultiLink = zextzbuf!(0x4, false); + + /// # LowLatency extension + /// Used to negotiate the use of lowlatency transport + pub type LowLatency = zextunit!(0x5, false); } impl InitSyn { @@ -160,6 +165,7 @@ impl InitSyn { let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); + let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); Self { version, @@ -171,6 +177,7 @@ impl InitSyn { ext_shm, ext_auth, ext_mlink, + ext_lowlatency, } } } @@ -187,6 +194,7 @@ pub struct InitAck { pub ext_shm: Option, pub ext_auth: Option, pub ext_mlink: Option, + pub ext_lowlatency: Option, } impl InitAck { @@ -211,6 +219,7 @@ impl InitAck { let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); + let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); Self { version, @@ -223,6 +232,7 @@ impl InitAck { ext_shm, ext_auth, ext_mlink, + ext_lowlatency, } } } diff --git a/commons/zenoh-protocol/src/transport/mod.rs b/commons/zenoh-protocol/src/transport/mod.rs index 5a1025fb36..301fde3343 100644 --- a/commons/zenoh-protocol/src/transport/mod.rs +++ b/commons/zenoh-protocol/src/transport/mod.rs @@ -29,7 +29,6 @@ pub use keepalive::KeepAlive; pub use oam::Oam; pub use open::{OpenAck, OpenSyn}; -#[cfg(feature = "shared-memory")] use crate::network::NetworkMessage; /// NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length @@ -59,17 +58,17 @@ pub mod id { pub const JOIN: u8 = 0x07; // For multicast communications only } -#[cfg(feature = "shared-memory")] #[derive(Debug)] -pub struct TransportMessageShm { - pub body: TransportBodyShm, +pub struct TransportMessageLowLatency { + pub body: TransportBodyLowLatency, } -#[cfg(feature = "shared-memory")] + +#[allow(clippy::large_enum_variant)] #[derive(Debug)] -pub enum TransportBodyShm { +pub enum TransportBodyLowLatency { Close(Close), KeepAlive(KeepAlive), - Network(Box), + Network(NetworkMessage), } pub type TransportSn = u32; diff --git a/commons/zenoh-protocol/src/transport/open.rs b/commons/zenoh-protocol/src/transport/open.rs index 5b4be0632b..b7ec56da62 100644 --- a/commons/zenoh-protocol/src/transport/open.rs +++ b/commons/zenoh-protocol/src/transport/open.rs @@ -81,6 +81,7 @@ pub struct OpenSyn { pub ext_shm: Option, pub ext_auth: Option, pub ext_mlink: Option, + pub ext_lowlatency: Option, } // Extensions @@ -106,6 +107,10 @@ pub mod ext { /// Used as challenge for probing multilink capabilities pub type MultiLinkSyn = zextzbuf!(0x4, false); pub type MultiLinkAck = zextunit!(0x4, false); + + /// # LowLatency extension + /// Used to negotiate the use of lowlatency transport + pub type LowLatency = zextunit!(0x5, false); } impl OpenSyn { @@ -131,6 +136,7 @@ impl OpenSyn { let ext_shm = rng.gen_bool(0.5).then_some(ZExtZ64::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_mlink = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); + let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); Self { lease, @@ -140,6 +146,7 @@ impl OpenSyn { ext_shm, ext_auth, ext_mlink, + ext_lowlatency, } } } @@ -152,6 +159,7 @@ pub struct OpenAck { pub ext_shm: Option, pub ext_auth: Option, pub ext_mlink: Option, + pub ext_lowlatency: Option, } impl OpenAck { @@ -173,6 +181,7 @@ impl OpenAck { let ext_shm = rng.gen_bool(0.5).then_some(ZExtZ64::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_mlink = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_lowlatency = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); Self { lease, @@ -181,6 +190,7 @@ impl OpenAck { ext_shm, ext_auth, ext_mlink, + ext_lowlatency, } } } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index a07febd43f..116e3dff8d 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -28,13 +28,13 @@ readme = "README.md" [features] shared-memory = ["zenoh/shared-memory"] unstable = ["zenoh/unstable"] -transport_shm = ["zenoh/transport_shm"] +transport_unixpipe = ["zenoh/transport_unixpipe"] -# Unfortunately, the feature "transport_shm" is always +# Unfortunately, the feature "transport_unixpipe" is always # enabled for the lines below. It looks like a Cargo bug :( # # [target.'cfg(unix)'.dependencies] -# zenoh = { workspace = true, features = ["transport_shm"] } +# zenoh = { workspace = true, features = ["transport_unixpipe"] } # # [target.'cfg(not(unix))'.dependencies] # zenoh = { workspace = true } diff --git a/io/zenoh-link/Cargo.toml b/io/zenoh-link/Cargo.toml index 79129740da..46d4cbb8bc 100644 --- a/io/zenoh-link/Cargo.toml +++ b/io/zenoh-link/Cargo.toml @@ -32,7 +32,7 @@ transport_udp = ["zenoh-link-udp"] transport_unixsock-stream = ["zenoh-link-unixsock_stream"] transport_ws = ["zenoh-link-ws"] transport_serial = ["zenoh-link-serial"] -transport_shm = ["zenoh-link-shm", "zenoh-link-shm/transport_shm"] +transport_unixpipe = ["zenoh-link-unixpipe", "zenoh-link-unixpipe/transport_unixpipe"] [dependencies] async-std = { workspace = true } @@ -47,6 +47,6 @@ zenoh-link-tls = { workspace = true, optional = true } zenoh-link-udp = { workspace = true, optional = true } zenoh-link-unixsock_stream = { workspace = true, optional = true } zenoh-link-ws = { workspace = true, optional = true } -zenoh-link-shm = { workspace = true, optional = true } +zenoh-link-unixpipe = { workspace = true, optional = true } zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } diff --git a/io/zenoh-link/src/lib.rs b/io/zenoh-link/src/lib.rs index 26efdd814c..9d49853501 100644 --- a/io/zenoh-link/src/lib.rs +++ b/io/zenoh-link/src/lib.rs @@ -66,11 +66,11 @@ pub use zenoh_link_serial as serial; #[cfg(feature = "transport_serial")] use zenoh_link_serial::{LinkManagerUnicastSerial, SerialLocatorInspector, SERIAL_LOCATOR_PREFIX}; -#[cfg(feature = "transport_shm")] -pub use zenoh_link_shm as shm; -#[cfg(feature = "transport_shm")] -use zenoh_link_shm::{ - LinkManagerUnicastPipe, ShmConfigurator, ShmLocatorInspector, SHM_LOCATOR_PREFIX, +#[cfg(feature = "transport_unixpipe")] +pub use zenoh_link_unixpipe as unixpipe; +#[cfg(feature = "transport_unixpipe")] +use zenoh_link_unixpipe::{ + LinkManagerUnicastPipe, UnixPipeConfigurator, UnixPipeLocatorInspector, UNIXPIPE_LOCATOR_PREFIX, }; pub use zenoh_link_commons::*; @@ -91,8 +91,8 @@ pub const PROTOCOLS: &[&str] = &[ unixsock_stream::UNIXSOCKSTREAM_LOCATOR_PREFIX, #[cfg(feature = "transport_serial")] serial::SERIAL_LOCATOR_PREFIX, - #[cfg(feature = "transport_shm")] - shm::SHM_LOCATOR_PREFIX, + #[cfg(feature = "transport_unixpipe")] + unixpipe::UNIXPIPE_LOCATOR_PREFIX, ]; #[derive(Default, Clone)] @@ -111,8 +111,8 @@ pub struct LocatorInspector { unixsock_stream_inspector: UnixSockStreamLocatorInspector, #[cfg(feature = "transport_serial")] serial_inspector: SerialLocatorInspector, - #[cfg(feature = "transport_shm")] - shm_inspector: ShmLocatorInspector, + #[cfg(feature = "transport_unixpipe")] + unixpipe_inspector: UnixPipeLocatorInspector, } impl LocatorInspector { pub async fn is_multicast(&self, locator: &Locator) -> ZResult { @@ -136,8 +136,8 @@ impl LocatorInspector { WS_LOCATOR_PREFIX => self.ws_inspector.is_multicast(locator).await, #[cfg(feature = "transport_serial")] SERIAL_LOCATOR_PREFIX => self.serial_inspector.is_multicast(locator).await, - #[cfg(feature = "transport_shm")] - SHM_LOCATOR_PREFIX => self.shm_inspector.is_multicast(locator).await, + #[cfg(feature = "transport_unixpipe")] + UNIXPIPE_LOCATOR_PREFIX => self.unixpipe_inspector.is_multicast(locator).await, _ => bail!("Unsupported protocol: {}.", protocol), } } @@ -148,8 +148,8 @@ pub struct LinkConfigurator { quic_inspector: QuicConfigurator, #[cfg(feature = "transport_tls")] tls_inspector: TlsConfigurator, - #[cfg(feature = "transport_shm")] - shm_inspector: ShmConfigurator, + #[cfg(feature = "transport_unixpipe")] + unixpipe_inspector: UnixPipeConfigurator, } impl LinkConfigurator { @@ -185,11 +185,11 @@ impl LinkConfigurator { self.tls_inspector.inspect_config(config).await, ); } - #[cfg(feature = "transport_shm")] + #[cfg(feature = "transport_unixpipe")] { insert_config( - SHM_LOCATOR_PREFIX.into(), - self.shm_inspector.inspect_config(config).await, + UNIXPIPE_LOCATOR_PREFIX.into(), + self.unixpipe_inspector.inspect_config(config).await, ); } (configs, errors) @@ -221,8 +221,8 @@ impl LinkManagerBuilderUnicast { WS_LOCATOR_PREFIX => Ok(Arc::new(LinkManagerUnicastWs::new(_manager))), #[cfg(feature = "transport_serial")] SERIAL_LOCATOR_PREFIX => Ok(Arc::new(LinkManagerUnicastSerial::new(_manager))), - #[cfg(feature = "transport_shm")] - SHM_LOCATOR_PREFIX => Ok(Arc::new(LinkManagerUnicastPipe::new(_manager))), + #[cfg(feature = "transport_unixpipe")] + UNIXPIPE_LOCATOR_PREFIX => Ok(Arc::new(LinkManagerUnicastPipe::new(_manager))), _ => bail!("Unicast not supported for {} protocol", protocol), } } diff --git a/io/zenoh-links/zenoh-link-shm/Cargo.toml b/io/zenoh-links/zenoh-link-unixpipe/Cargo.toml similarity index 96% rename from io/zenoh-links/zenoh-link-shm/Cargo.toml rename to io/zenoh-links/zenoh-link-unixpipe/Cargo.toml index f29be5ade8..112e9705a6 100644 --- a/io/zenoh-links/zenoh-link-shm/Cargo.toml +++ b/io/zenoh-links/zenoh-link-unixpipe/Cargo.toml @@ -13,7 +13,7 @@ # [package] rust-version = { workspace = true } -name = "zenoh-link-shm" +name = "zenoh-link-unixpipe" version = { workspace = true } repository = { workspace = true } homepage = { workspace = true } @@ -25,7 +25,7 @@ description = "Internal crate for zenoh." # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -transport_shm = [] +transport_unixpipe = [] [dependencies] async-std = { workspace = true } diff --git a/io/zenoh-links/zenoh-link-shm/src/lib.rs b/io/zenoh-links/zenoh-link-unixpipe/src/lib.rs similarity index 100% rename from io/zenoh-links/zenoh-link-shm/src/lib.rs rename to io/zenoh-links/zenoh-link-unixpipe/src/lib.rs diff --git a/io/zenoh-links/zenoh-link-shm/src/unix/mod.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/mod.rs similarity index 64% rename from io/zenoh-links/zenoh-link-shm/src/unix/mod.rs rename to io/zenoh-links/zenoh-link-unixpipe/src/unix/mod.rs index 81a02633c1..8793add470 100644 --- a/io/zenoh-links/zenoh-link-shm/src/unix/mod.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/mod.rs @@ -27,14 +27,14 @@ use zenoh_link_commons::{ConfigurationInspector, LocatorInspector}; use zenoh_protocol::core::{Locator, Parameters}; use zenoh_result::ZResult; -pub const SHM_LOCATOR_PREFIX: &str = "shm"; +pub const UNIXPIPE_LOCATOR_PREFIX: &str = "unixpipe"; #[derive(Default, Clone, Copy)] -pub struct ShmLocatorInspector; +pub struct UnixPipeLocatorInspector; #[async_trait] -impl LocatorInspector for ShmLocatorInspector { +impl LocatorInspector for UnixPipeLocatorInspector { fn protocol(&self) -> &str { - SHM_LOCATOR_PREFIX + UNIXPIPE_LOCATOR_PREFIX } async fn is_multicast(&self, _locator: &Locator) -> ZResult { @@ -43,17 +43,17 @@ impl LocatorInspector for ShmLocatorInspector { } #[derive(Default, Clone, Copy, Debug)] -pub struct ShmConfigurator; +pub struct UnixPipeConfigurator; #[async_trait] -impl ConfigurationInspector for ShmConfigurator { +impl ConfigurationInspector for UnixPipeConfigurator { async fn inspect_config(&self, config: &Config) -> ZResult { let mut properties: Vec<(&str, &str)> = vec![]; - let c = config.transport().link().shared_memory(); - let shm_access_mask_; - if let Some(shm_access_mask) = c.shm_access_mask() { - shm_access_mask_ = shm_access_mask.to_string(); - properties.push((config::SHM_ACCESS_MASK, &shm_access_mask_)); + let c = config.transport().link().unixpipe(); + let file_access_mask_; + if let Some(file_access_mask) = c.file_access_mask() { + file_access_mask_ = file_access_mask.to_string(); + properties.push((config::FILE_ACCESS_MASK, &file_access_mask_)); } let mut s = String::new(); @@ -64,11 +64,11 @@ impl ConfigurationInspector for ShmConfigurator { } zconfigurable! { - // Default access mask for SHM resources - static ref SHM_ACCESS_MASK: u32 = config::SHM_ACCESS_MASK_DEFAULT; + // Default access mask for pipe files + static ref FILE_ACCESS_MASK: u32 = config::FILE_ACCESS_MASK_DEFAULT; } pub mod config { - pub const SHM_ACCESS_MASK: &str = "shm_mask"; - pub const SHM_ACCESS_MASK_DEFAULT: u32 = 0o777; + pub const FILE_ACCESS_MASK: &str = "file_mask"; + pub const FILE_ACCESS_MASK_DEFAULT: u32 = 0o777; } diff --git a/io/zenoh-links/zenoh-link-shm/src/unix/unicast.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs similarity index 98% rename from io/zenoh-links/zenoh-link-shm/src/unix/unicast.rs rename to io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs index 27a9f0e037..72d7859326 100644 --- a/io/zenoh-links/zenoh-link-shm/src/unix/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs @@ -38,7 +38,7 @@ use zenoh_link_commons::{ }; use zenoh_result::{bail, ZResult}; -use super::SHM_ACCESS_MASK; +use super::FILE_ACCESS_MASK; const LINUX_PIPE_MAX_MTU: u16 = 65_535; const LINUX_PIPE_DEDICATE_TRIES: usize = 100; @@ -450,7 +450,7 @@ impl Drop for UnicastPipe { #[async_trait] impl LinkUnicastTrait for UnicastPipe { async fn close(&self) -> ZResult<()> { - log::trace!("Closing SHM Pipe link: {}", self); + log::trace!("Closing Unix Pipe link: {}", self); Ok(()) } @@ -505,7 +505,7 @@ impl fmt::Display for UnicastPipe { impl fmt::Debug for UnicastPipe { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Shm") + f.debug_struct("UnicastPipe") .field("src", &self.local) .field("dst", &self.remote) .finish() @@ -578,9 +578,9 @@ fn parse_pipe_endpoint(endpoint: &EndPoint) -> (String, String, u32) { let path_downlink = path.to_string() + "_downlink"; let access_mode = endpoint .config() - .get(config::SHM_ACCESS_MASK) - .map_or(*SHM_ACCESS_MASK, |val| { - val.parse().unwrap_or(*SHM_ACCESS_MASK) + .get(config::FILE_ACCESS_MASK) + .map_or(*FILE_ACCESS_MASK, |val| { + val.parse().unwrap_or(*FILE_ACCESS_MASK) }); (path_uplink, path_downlink, access_mode) } diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index 7476da7196..77f6b18db3 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -42,7 +42,7 @@ transport_unixsock-stream = ["zenoh-link/transport_unixsock-stream"] transport_ws = ["zenoh-link/transport_ws"] transport_serial = ["zenoh-link/transport_serial"] transport_compression = [] -transport_shm = ["zenoh-link/transport_shm"] +transport_unixpipe = ["zenoh-link/transport_unixpipe"] stats = ["zenoh-protocol/stats"] test = [] unstable = [] diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index 01b5f4af61..c25c4bb873 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -55,6 +55,7 @@ struct State { ext_shm: ext::shm::StateAccept, #[cfg(feature = "transport_auth")] ext_auth: ext::auth::StateAccept, + ext_lowlatency: ext::lowlatency::StateAccept, } // InitSyn @@ -115,6 +116,7 @@ struct AcceptLink<'a> { ext_shm: ext::shm::ShmFsm<'a>, #[cfg(feature = "transport_auth")] ext_auth: ext::auth::AuthFsm<'a>, + ext_lowlatency: ext::lowlatency::LowLatencyFsm<'a>, } #[async_trait] @@ -187,6 +189,12 @@ impl<'a> AcceptFsm for AcceptLink<'a> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension LowLatency + self.ext_lowlatency + .recv_init_syn((&mut state.ext_lowlatency, init_syn.ext_lowlatency)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension Shm #[cfg(feature = "shared-memory")] let ext_shm = self @@ -234,6 +242,13 @@ impl<'a> AcceptFsm for AcceptLink<'a> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension LowLatency + let ext_lowlatency = self + .ext_lowlatency + .send_init_ack(&state.ext_lowlatency) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension Shm let ext_shm = zcondfeat!( "shared-memory", @@ -279,6 +294,7 @@ impl<'a> AcceptFsm for AcceptLink<'a> { ext_shm: state.ext_shm, #[cfg(feature = "transport_auth")] ext_auth: state.ext_auth, + ext_lowlatency: state.ext_lowlatency, }; let mut encrypted = vec![]; @@ -308,6 +324,7 @@ impl<'a> AcceptFsm for AcceptLink<'a> { ext_shm, ext_auth, ext_mlink, + ext_lowlatency, } .into(); @@ -394,6 +411,7 @@ impl<'a> AcceptFsm for AcceptLink<'a> { ext_shm: cookie.ext_shm, #[cfg(feature = "transport_auth")] ext_auth: cookie.ext_auth, + ext_lowlatency: cookie.ext_lowlatency, }; // Extension QoS @@ -402,6 +420,12 @@ impl<'a> AcceptFsm for AcceptLink<'a> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension LowLatency + self.ext_lowlatency + .recv_open_syn((&mut state.ext_lowlatency, open_syn.ext_lowlatency)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension Shm #[cfg(feature = "shared-memory")] self.ext_shm @@ -447,6 +471,13 @@ impl<'a> AcceptFsm for AcceptLink<'a> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension LowLatency + let ext_lowlatency = self + .ext_lowlatency + .send_open_ack(&state.ext_lowlatency) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension Shm let ext_shm = zcondfeat!( "shared-memory", @@ -486,6 +517,7 @@ impl<'a> AcceptFsm for AcceptLink<'a> { ext_shm, ext_auth, ext_mlink, + ext_lowlatency, }; // Do not send the OpenAck right now since we might still incur in MAX_LINKS error @@ -507,6 +539,7 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) ext_mlink: manager.state.unicast.multilink.fsm(&manager.prng), #[cfg(feature = "transport_auth")] ext_auth: manager.state.unicast.authenticator.fsm(&manager.prng), + ext_lowlatency: ext::lowlatency::LowLatencyFsm::new(), }; // Init handshake @@ -530,6 +563,7 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) resolution: manager.config.resolution, }, ext_qos: ext::qos::StateAccept::new(manager.config.unicast.is_qos), + ext_lowlatency: ext::lowlatency::StateAccept::new(manager.config.unicast.is_lowlatency), #[cfg(feature = "transport_multilink")] ext_mlink: manager .state @@ -591,6 +625,7 @@ pub(crate) async fn accept_link(link: &LinkUnicast, manager: &TransportManager) multilink: state.ext_mlink.multilink(), #[cfg(feature = "shared-memory")] is_shm: state.ext_shm.is_shm(), + is_lowlatency: state.ext_lowlatency.is_lowlatency(), }; let transport = step!( diff --git a/io/zenoh-transport/src/unicast/establishment/cookie.rs b/io/zenoh-transport/src/unicast/establishment/cookie.rs index 169fa0f9fb..0c6b5519e8 100644 --- a/io/zenoh-transport/src/unicast/establishment/cookie.rs +++ b/io/zenoh-transport/src/unicast/establishment/cookie.rs @@ -37,6 +37,7 @@ pub(crate) struct Cookie { pub(crate) ext_shm: ext::shm::StateAccept, #[cfg(feature = "transport_auth")] pub(crate) ext_auth: ext::auth::StateAccept, + pub(crate) ext_lowlatency: ext::lowlatency::StateAccept, } impl WCodec<&Cookie, &mut W> for Zenoh080 @@ -60,6 +61,7 @@ where self.write(&mut *writer, &x.ext_shm)?; #[cfg(feature = "transport_auth")] self.write(&mut *writer, &x.ext_auth)?; + self.write(&mut *writer, &x.ext_lowlatency)?; Ok(()) } @@ -87,6 +89,7 @@ where let ext_shm: ext::shm::StateAccept = self.read(&mut *reader)?; #[cfg(feature = "transport_auth")] let ext_auth: ext::auth::StateAccept = self.read(&mut *reader)?; + let ext_lowlatency: ext::lowlatency::StateAccept = self.read(&mut *reader)?; let cookie = Cookie { zid, @@ -101,6 +104,7 @@ where ext_shm, #[cfg(feature = "transport_auth")] ext_auth, + ext_lowlatency, }; Ok(cookie) @@ -169,6 +173,7 @@ impl Cookie { ext_shm: ext::shm::StateAccept::rand(), #[cfg(feature = "transport_auth")] ext_auth: ext::auth::StateAccept::rand(), + ext_lowlatency: ext::lowlatency::StateAccept::rand(), } } } diff --git a/io/zenoh-transport/src/unicast/establishment/ext/lowlatency.rs b/io/zenoh-transport/src/unicast/establishment/ext/lowlatency.rs new file mode 100644 index 0000000000..25edbde2e1 --- /dev/null +++ b/io/zenoh-transport/src/unicast/establishment/ext/lowlatency.rs @@ -0,0 +1,192 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use crate::unicast::establishment::{AcceptFsm, OpenFsm}; +use async_trait::async_trait; +use core::marker::PhantomData; +use zenoh_buffers::{ + reader::{DidntRead, Reader}, + writer::{DidntWrite, Writer}, +}; +use zenoh_codec::{RCodec, WCodec, Zenoh080}; +use zenoh_protocol::transport::{init, open}; +use zenoh_result::Error as ZError; + +// Extension Fsm +pub(crate) struct LowLatencyFsm<'a> { + _a: PhantomData<&'a ()>, +} + +impl<'a> LowLatencyFsm<'a> { + pub(crate) const fn new() -> Self { + Self { _a: PhantomData } + } +} + +/*************************************/ +/* OPEN */ +/*************************************/ +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) struct StateOpen { + is_lowlatency: bool, +} + +impl StateOpen { + pub(crate) const fn new(is_lowlatency: bool) -> Self { + Self { is_lowlatency } + } + + pub(crate) const fn is_lowlatency(&self) -> bool { + self.is_lowlatency + } +} + +#[async_trait] +impl<'a> OpenFsm for LowLatencyFsm<'a> { + type Error = ZError; + + type SendInitSynIn = &'a StateOpen; + type SendInitSynOut = Option; + async fn send_init_syn( + &self, + state: Self::SendInitSynIn, + ) -> Result { + let output = state.is_lowlatency.then_some(init::ext::LowLatency::new()); + Ok(output) + } + + type RecvInitAckIn = (&'a mut StateOpen, Option); + type RecvInitAckOut = (); + async fn recv_init_ack( + &self, + input: Self::RecvInitAckIn, + ) -> Result { + let (state, other_ext) = input; + state.is_lowlatency &= other_ext.is_some(); + Ok(()) + } + + type SendOpenSynIn = &'a StateOpen; + type SendOpenSynOut = Option; + async fn send_open_syn( + &self, + _state: Self::SendOpenSynIn, + ) -> Result { + Ok(None) + } + + type RecvOpenAckIn = (&'a mut StateOpen, Option); + type RecvOpenAckOut = (); + async fn recv_open_ack( + &self, + _state: Self::RecvOpenAckIn, + ) -> Result { + Ok(()) + } +} + +/*************************************/ +/* ACCEPT */ +/*************************************/ +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) struct StateAccept { + is_lowlatency: bool, +} + +impl StateAccept { + pub(crate) const fn new(is_lowlatency: bool) -> Self { + Self { is_lowlatency } + } + + pub(crate) const fn is_lowlatency(&self) -> bool { + self.is_lowlatency + } + + #[cfg(test)] + pub(crate) fn rand() -> Self { + use rand::Rng; + let mut rng = rand::thread_rng(); + Self::new(rng.gen_bool(0.5)) + } +} + +// Codec +impl WCodec<&StateAccept, &mut W> for Zenoh080 +where + W: Writer, +{ + type Output = Result<(), DidntWrite>; + + fn write(self, writer: &mut W, x: &StateAccept) -> Self::Output { + let is_lowlatency = u8::from(x.is_lowlatency); + self.write(&mut *writer, is_lowlatency)?; + Ok(()) + } +} + +impl RCodec for Zenoh080 +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result { + let is_lowlatency: u8 = self.read(&mut *reader)?; + let is_lowlatency = is_lowlatency == 1; + Ok(StateAccept { is_lowlatency }) + } +} + +#[async_trait] +impl<'a> AcceptFsm for LowLatencyFsm<'a> { + type Error = ZError; + + type RecvInitSynIn = (&'a mut StateAccept, Option); + type RecvInitSynOut = (); + async fn recv_init_syn( + &self, + input: Self::RecvInitSynIn, + ) -> Result { + let (state, other_ext) = input; + state.is_lowlatency &= other_ext.is_some(); + Ok(()) + } + + type SendInitAckIn = &'a StateAccept; + type SendInitAckOut = Option; + async fn send_init_ack( + &self, + state: Self::SendInitAckIn, + ) -> Result { + let output = state.is_lowlatency.then_some(init::ext::LowLatency::new()); + Ok(output) + } + + type RecvOpenSynIn = (&'a mut StateAccept, Option); + type RecvOpenSynOut = (); + async fn recv_open_syn( + &self, + _state: Self::RecvOpenSynIn, + ) -> Result { + Ok(()) + } + + type SendOpenAckIn = &'a StateAccept; + type SendOpenAckOut = Option; + async fn send_open_ack( + &self, + _state: Self::SendOpenAckIn, + ) -> Result { + Ok(None) + } +} diff --git a/io/zenoh-transport/src/unicast/establishment/ext/mod.rs b/io/zenoh-transport/src/unicast/establishment/ext/mod.rs index 30e7a12b53..956a8c5112 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/mod.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/mod.rs @@ -13,6 +13,7 @@ // #[cfg(feature = "transport_auth")] pub mod auth; +pub(crate) mod lowlatency; #[cfg(feature = "transport_multilink")] pub(crate) mod multilink; pub(crate) mod qos; diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index e7d78c6537..19f94cf26e 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -51,6 +51,7 @@ struct State { ext_shm: ext::shm::StateOpen, #[cfg(feature = "transport_auth")] ext_auth: ext::auth::StateOpen, + ext_lowlatency: ext::lowlatency::StateOpen, } // InitSyn @@ -99,6 +100,7 @@ struct OpenLink<'a> { ext_shm: ext::shm::ShmFsm<'a>, #[cfg(feature = "transport_auth")] ext_auth: ext::auth::AuthFsm<'a>, + ext_lowlatency: ext::lowlatency::LowLatencyFsm<'a>, } #[async_trait] @@ -120,6 +122,13 @@ impl<'a> OpenFsm for OpenLink<'a> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension LowLatency + let ext_lowlatency = self + .ext_lowlatency + .send_init_syn(&state.ext_lowlatency) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension Shm let ext_shm = zcondfeat!( "shared-memory", @@ -160,6 +169,7 @@ impl<'a> OpenFsm for OpenLink<'a> { ext_shm, ext_auth, ext_mlink, + ext_lowlatency, } .into(); @@ -257,6 +267,12 @@ impl<'a> OpenFsm for OpenLink<'a> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension LowLatency + self.ext_lowlatency + .recv_init_ack((&mut state.ext_lowlatency, init_ack.ext_lowlatency)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension Shm #[cfg(feature = "shared-memory")] let shm_challenge = self @@ -304,6 +320,13 @@ impl<'a> OpenFsm for OpenLink<'a> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension LowLatency + let ext_lowlatency = self + .ext_lowlatency + .send_open_syn(&state.ext_lowlatency) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension Shm let ext_shm = zcondfeat!( "shared-memory", @@ -344,6 +367,7 @@ impl<'a> OpenFsm for OpenLink<'a> { ext_shm, ext_auth, ext_mlink, + ext_lowlatency, } .into(); @@ -400,6 +424,12 @@ impl<'a> OpenFsm for OpenLink<'a> { .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension LowLatency + self.ext_lowlatency + .recv_open_ack((&mut state.ext_lowlatency, open_ack.ext_lowlatency)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; + // Extension Shm #[cfg(feature = "shared-memory")] self.ext_shm @@ -442,6 +472,7 @@ pub(crate) async fn open_link( ext_shm: ext::shm::ShmFsm::new(&manager.state.unicast.shm), #[cfg(feature = "transport_auth")] ext_auth: manager.state.unicast.authenticator.fsm(&manager.prng), + ext_lowlatency: ext::lowlatency::LowLatencyFsm::new(), }; let mut state = State { @@ -464,6 +495,7 @@ pub(crate) async fn open_link( .unicast .authenticator .open(&mut *zasynclock!(manager.prng)), + ext_lowlatency: ext::lowlatency::StateOpen::new(manager.config.unicast.is_lowlatency), }; // Init handshake @@ -512,6 +544,7 @@ pub(crate) async fn open_link( multilink: state.ext_mlink.multilink(), #[cfg(feature = "shared-memory")] is_shm: state.ext_shm.is_shm(), + is_lowlatency: state.ext_lowlatency.is_lowlatency(), }; let transport = step!( diff --git a/io/zenoh-transport/src/unicast/shm/link.rs b/io/zenoh-transport/src/unicast/lowlatency/link.rs similarity index 83% rename from io/zenoh-transport/src/unicast/shm/link.rs rename to io/zenoh-transport/src/unicast/lowlatency/link.rs index 5b5c9b764b..f9f949bf99 100644 --- a/io/zenoh-transport/src/unicast/shm/link.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/link.rs @@ -11,7 +11,9 @@ // Contributors: // ZettaScale Zenoh Team, // -use super::transport::TransportUnicastShm; +use super::transport::TransportUnicastLowlatency; +#[cfg(feature = "stats")] +use crate::stats::TransportStats; use crate::TransportExecutor; use async_std::task; use async_std::{prelude::FutureExt, sync::RwLock}; @@ -22,11 +24,18 @@ use std::sync::Arc; use std::time::Duration; use zenoh_buffers::{writer::HasWriter, ZSlice}; use zenoh_link::LinkUnicast; -use zenoh_protocol::transport::{BatchSize, KeepAlive, TransportBodyShm, TransportMessageShm}; +use zenoh_protocol::transport::{ + BatchSize, KeepAlive, TransportBodyLowLatency, TransportMessageLowLatency, +}; use zenoh_result::{zerror, ZResult}; use zenoh_sync::RecyclingObjectPool; -pub(crate) async fn send_with_link(link: &LinkUnicast, msg: TransportMessageShm) -> ZResult<()> { +pub(crate) async fn send_with_link( + link: &LinkUnicast, + msg: TransportMessageLowLatency, + #[cfg(feature = "stats")] stats: &Arc, +) -> ZResult<()> { + let len; if link.is_streamed() { let mut buffer = vec![0, 0, 0, 0]; let codec = Zenoh080::new(); @@ -35,7 +44,7 @@ pub(crate) async fn send_with_link(link: &LinkUnicast, msg: TransportMessageShm) .write(&mut writer, &msg) .map_err(|_| zerror!("Error serializing message {:?}", msg))?; - let len = (buffer.len() - 4) as u32; + len = (buffer.len() - 4) as u32; let le = len.to_le_bytes(); buffer[0..4].copy_from_slice(&le); @@ -49,27 +58,36 @@ pub(crate) async fn send_with_link(link: &LinkUnicast, msg: TransportMessageShm) .write(&mut writer, &msg) .map_err(|_| zerror!("Error serializing message {:?}", msg))?; + #[cfg(feature = "stats")] + { + len = buffer.len() as u32; + } link.write_all(&buffer).await?; } log::trace!("Sent: {:?}", msg); - // #[cfg(feature = "stats")] - // { - // stats.inc_tx_t_msgs(1); - // stats.inc_tx_bytes(buff.len() + 2); - // } - + #[cfg(feature = "stats")] + { + stats.inc_tx_t_msgs(1); + stats.inc_tx_bytes(len as usize); + } Ok(()) } -impl TransportUnicastShm { - pub(super) fn send(&self, msg: TransportMessageShm) -> ZResult<()> { +impl TransportUnicastLowlatency { + pub(super) fn send(&self, msg: TransportMessageLowLatency) -> ZResult<()> { async_std::task::block_on(self.send_async(msg)) } - pub(super) async fn send_async(&self, msg: TransportMessageShm) -> ZResult<()> { + pub(super) async fn send_async(&self, msg: TransportMessageLowLatency) -> ZResult<()> { let guard = zasyncwrite!(self.link); - send_with_link(&guard, msg).await + send_with_link( + &guard, + msg, + #[cfg(feature = "stats")] + &self.stats, + ) + .await } pub(super) fn start_keepalive(&self, executor: &TransportExecutor, keep_alive: Duration) { @@ -79,8 +97,8 @@ impl TransportUnicastShm { let res = keepalive_task( c_transport.link.clone(), keep_alive, - // #[cfg(feature = "stats")] - // c_transport.stats, + #[cfg(feature = "stats")] + c_transport.stats.clone(), ) .await; log::debug!( @@ -161,20 +179,21 @@ impl TransportUnicastShm { async fn keepalive_task( link: Arc>, keep_alive: Duration, - // #[cfg(feature = "stats")] stats: Arc, + #[cfg(feature = "stats")] stats: Arc, ) -> ZResult<()> { loop { async_std::task::sleep(keep_alive).await; - let keepailve = TransportMessageShm { - body: TransportBodyShm::KeepAlive(KeepAlive), + let keepailve = TransportMessageLowLatency { + body: TransportBodyLowLatency::KeepAlive(KeepAlive), }; let guard = zasyncwrite!(link); let _ = send_with_link( - &guard, keepailve, - // #[cfg(feature = "stats")] - // &stats, + &guard, + keepailve, + #[cfg(feature = "stats")] + &stats, ) .await; drop(guard); @@ -183,7 +202,7 @@ async fn keepalive_task( async fn rx_task_stream( link: LinkUnicast, - transport: TransportUnicastShm, + transport: TransportUnicastLowlatency, lease: Duration, rx_batch_size: BatchSize, rx_buffer_size: usize, @@ -226,7 +245,7 @@ async fn rx_task_stream( async fn rx_task_dgram( link: LinkUnicast, - transport: TransportUnicastShm, + transport: TransportUnicastLowlatency, lease: Duration, rx_batch_size: BatchSize, rx_buffer_size: usize, @@ -260,7 +279,7 @@ async fn rx_task_dgram( async fn rx_task( link: LinkUnicast, - transport: TransportUnicastShm, + transport: TransportUnicastLowlatency, lease: Duration, rx_batch_size: u16, rx_buffer_size: usize, diff --git a/io/zenoh-transport/src/unicast/net/mod.rs b/io/zenoh-transport/src/unicast/lowlatency/mod.rs similarity index 100% rename from io/zenoh-transport/src/unicast/net/mod.rs rename to io/zenoh-transport/src/unicast/lowlatency/mod.rs diff --git a/io/zenoh-transport/src/unicast/shm/rx.rs b/io/zenoh-transport/src/unicast/lowlatency/rx.rs similarity index 78% rename from io/zenoh-transport/src/unicast/shm/rx.rs rename to io/zenoh-transport/src/unicast/lowlatency/rx.rs index 75ddd31586..87c03dde56 100644 --- a/io/zenoh-transport/src/unicast/shm/rx.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/rx.rs @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -use super::transport::TransportUnicastShm; +use super::transport::TransportUnicastLowlatency; use zenoh_buffers::{ reader::{HasReader, Reader}, ZSlice, @@ -19,13 +19,13 @@ use zenoh_buffers::{ use zenoh_codec::{RCodec, Zenoh080}; use zenoh_core::zread; use zenoh_link::LinkUnicast; -use zenoh_protocol::{network::NetworkMessage, transport::TransportMessageShm}; +use zenoh_protocol::{network::NetworkMessage, transport::TransportMessageLowLatency}; use zenoh_result::{zerror, ZResult}; /*************************************/ /* TRANSPORT RX */ /*************************************/ -impl TransportUnicastShm { +impl TransportUnicastLowlatency { fn trigger_callback( &self, #[allow(unused_mut)] // shared-memory feature requires mut @@ -58,24 +58,24 @@ impl TransportUnicastShm { let codec = Zenoh080::new(); let mut reader = zslice.reader(); while reader.can_read() { - let msg: TransportMessageShm = codec + let msg: TransportMessageLowLatency = codec .read(&mut reader) .map_err(|_| zerror!("{}: decoding error", link))?; log::trace!("Received: {:?}", msg); - // #[cfg(feature = "stats")] - // { - // transport.stats.inc_rx_t_msgs(1); - // } + #[cfg(feature = "stats")] + { + self.stats.inc_rx_t_msgs(1); + } match msg.body { - zenoh_protocol::transport::TransportBodyShm::Close(_) => { + zenoh_protocol::transport::TransportBodyLowLatency::Close(_) => { let _ = self.delete().await; } - zenoh_protocol::transport::TransportBodyShm::KeepAlive(_) => {} - zenoh_protocol::transport::TransportBodyShm::Network(msg) => { - let _ = self.trigger_callback(*msg); + zenoh_protocol::transport::TransportBodyLowLatency::KeepAlive(_) => {} + zenoh_protocol::transport::TransportBodyLowLatency::Network(msg) => { + let _ = self.trigger_callback(msg); } } } diff --git a/io/zenoh-transport/src/unicast/shm/transport.rs b/io/zenoh-transport/src/unicast/lowlatency/transport.rs similarity index 83% rename from io/zenoh-transport/src/unicast/shm/transport.rs rename to io/zenoh-transport/src/unicast/lowlatency/transport.rs index f5fe683041..f00876a2e8 100644 --- a/io/zenoh-transport/src/unicast/shm/transport.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/transport.rs @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -#[cfg(feature = "transport_shm")] +#[cfg(feature = "transport_unixpipe")] use super::link::send_with_link; #[cfg(feature = "stats")] use crate::stats::TransportStats; @@ -20,33 +20,35 @@ use crate::TransportConfigUnicast; use crate::TransportManager; use crate::{TransportExecutor, TransportPeerEventHandler}; use async_executor::Task; -#[cfg(feature = "transport_shm")] +#[cfg(feature = "transport_unixpipe")] use async_std::sync::RwLockUpgradableReadGuard; use async_std::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock}; use async_std::task::JoinHandle; use async_trait::async_trait; use std::sync::{Arc, RwLock as SyncRwLock}; use std::time::Duration; -#[cfg(feature = "transport_shm")] +#[cfg(feature = "transport_unixpipe")] use zenoh_core::zasyncread_upgradable; use zenoh_core::{zasynclock, zasyncread, zread, zwrite}; -#[cfg(feature = "transport_shm")] +#[cfg(feature = "transport_unixpipe")] +use zenoh_link::unixpipe::UNIXPIPE_LOCATOR_PREFIX; +#[cfg(feature = "transport_unixpipe")] use zenoh_link::Link; use zenoh_link::{LinkUnicast, LinkUnicastDirection}; use zenoh_protocol::core::{WhatAmI, ZenohId}; use zenoh_protocol::network::NetworkMessage; -use zenoh_protocol::transport::TransportBodyShm; -use zenoh_protocol::transport::TransportMessageShm; +use zenoh_protocol::transport::TransportBodyLowLatency; +use zenoh_protocol::transport::TransportMessageLowLatency; use zenoh_protocol::transport::{Close, TransportSn}; -#[cfg(not(feature = "transport_shm"))] +#[cfg(not(feature = "transport_unixpipe"))] use zenoh_result::bail; use zenoh_result::{zerror, ZResult}; /*************************************/ -/* TRANSPORT */ +/* LOW-LATENCY TRANSPORT */ /*************************************/ #[derive(Clone)] -pub(crate) struct TransportUnicastShm { +pub(crate) struct TransportUnicastLowlatency { // Transport Manager pub(super) manager: TransportManager, // Transport config @@ -66,13 +68,13 @@ pub(crate) struct TransportUnicastShm { pub(crate) handle_rx: Arc>>>, } -impl TransportUnicastShm { +impl TransportUnicastLowlatency { pub fn make( manager: TransportManager, config: TransportConfigUnicast, link: LinkUnicast, - ) -> ZResult { - let t = TransportUnicastShm { + ) -> ZResult { + let t = TransportUnicastLowlatency { manager, config, link: Arc::new(RwLock::new(link)), @@ -98,8 +100,8 @@ impl TransportUnicastShm { ); // Send close message on the link - let close = TransportMessageShm { - body: TransportBodyShm::Close(Close { + let close = TransportMessageLowLatency { + body: TransportBodyLowLatency::Close(Close { reason, session: false, }), @@ -145,7 +147,7 @@ impl TransportUnicastShm { } #[async_trait] -impl TransportUnicastTrait for TransportUnicastShm { +impl TransportUnicastTrait for TransportUnicastLowlatency { /*************************************/ /* ACCESSORS */ /*************************************/ @@ -222,25 +224,24 @@ impl TransportUnicastTrait for TransportUnicastShm { async fn add_link(&self, link: LinkUnicast, _direction: LinkUnicastDirection) -> ZResult<()> { log::trace!("Adding link: {}", link); - #[cfg(not(feature = "transport_shm"))] + #[cfg(not(feature = "transport_unixpipe"))] bail!( "Can not add Link {} with peer {}: link already exists and only unique link is supported!", link, self.config.zid, ); - #[cfg(feature = "transport_shm")] + #[cfg(feature = "transport_unixpipe")] { let guard = zasyncread_upgradable!(self.link); - let shm_protocol = "shm"; - let existing_shm = guard.get_dst().protocol().as_str() == shm_protocol; - let new_shm = link.get_dst().protocol().as_str() == shm_protocol; - match (existing_shm, new_shm) { + let existing_unixpipe = guard.get_dst().protocol().as_str() == UNIXPIPE_LOCATOR_PREFIX; + let new_unixpipe = link.get_dst().protocol().as_str() == UNIXPIPE_LOCATOR_PREFIX; + match (existing_unixpipe, new_unixpipe) { (false, true) => { - // SHM transport suports only a single link, but code here also handles upgrade from non-shm link to shm link! + // LowLatency transport suports only a single link, but code here also handles upgrade from non-unixpipe link to unixpipe link! log::trace!( - "Upgrading {} SHM transport's link from {} to {}", + "Upgrading {} LowLatency transport's link from {} to {}", self.config.zid, guard, link @@ -248,13 +249,19 @@ impl TransportUnicastTrait for TransportUnicastShm { // Prepare and send close message on old link { - let close = TransportMessageShm { - body: TransportBodyShm::Close(Close { + let close = TransportMessageLowLatency { + body: TransportBodyLowLatency::Close(Close { reason: 0, session: false, }), }; - let _ = send_with_link(&guard, close).await; + let _ = send_with_link( + &guard, + close, + #[cfg(feature = "stats")] + &self.stats, + ) + .await; }; // Notify the callback if let Some(callback) = zread!(self.callback).as_ref() { diff --git a/io/zenoh-transport/src/unicast/shm/tx.rs b/io/zenoh-transport/src/unicast/lowlatency/tx.rs similarity index 70% rename from io/zenoh-transport/src/unicast/shm/tx.rs rename to io/zenoh-transport/src/unicast/lowlatency/tx.rs index b93acc65fd..38751eb61d 100644 --- a/io/zenoh-transport/src/unicast/shm/tx.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/tx.rs @@ -11,14 +11,16 @@ // Contributors: // ZettaScale Zenoh Team, // -use super::transport::TransportUnicastShm; +use super::transport::TransportUnicastLowlatency; use zenoh_protocol::{ network::NetworkMessage, - transport::{TransportBodyShm, TransportMessageShm}, + transport::{TransportBodyLowLatency, TransportMessageLowLatency}, }; -use zenoh_result::{bail, ZResult}; +#[cfg(feature = "shared-memory")] +use zenoh_result::bail; +use zenoh_result::ZResult; -impl TransportUnicastShm { +impl TransportUnicastLowlatency { #[allow(unused_mut)] // When feature "shared-memory" is not enabled #[allow(clippy::let_and_return)] // When feature "stats" is not enabled #[inline(always)] @@ -35,17 +37,17 @@ impl TransportUnicastShm { } } - let msg = TransportMessageShm { - body: TransportBodyShm::Network(Box::new(msg)), + let msg = TransportMessageLowLatency { + body: TransportBodyLowLatency::Network(msg), }; let res = self.send(msg); - // #[cfg(feature = "stats")] - // if res { - // self.stats.inc_tx_z_msgs(1); - // } else { - // self.stats.inc_tx_z_dropped(1); - // } + #[cfg(feature = "stats")] + if res.is_ok() { + self.stats.inc_tx_n_msgs(1); + } else { + self.stats.inc_tx_n_dropped(1); + } res } diff --git a/io/zenoh-transport/src/unicast/manager.rs b/io/zenoh-transport/src/unicast/manager.rs index 8ba61150c0..384b992401 100644 --- a/io/zenoh-transport/src/unicast/manager.rs +++ b/io/zenoh-transport/src/unicast/manager.rs @@ -13,16 +13,15 @@ // #[cfg(feature = "shared-memory")] use super::shared_memory_unicast::SharedMemoryUnicast; -#[cfg(feature = "shared-memory")] -use super::shm::transport::TransportUnicastShm; #[cfg(feature = "transport_auth")] use crate::unicast::establishment::ext::auth::Auth; #[cfg(feature = "transport_multilink")] use crate::unicast::establishment::ext::multilink::MultiLink; use crate::{ - net::transport::TransportUnicastNet, + lowlatency::transport::TransportUnicastLowlatency, transport_unicast_inner::TransportUnicastTrait, unicast::{TransportConfigUnicast, TransportUnicast}, + universal::transport::TransportUnicastUniversal, TransportManager, }; use async_std::{prelude::FutureExt, sync::Mutex, task}; @@ -49,6 +48,7 @@ pub struct TransportManagerConfigUnicast { pub accept_pending: usize, pub max_sessions: usize, pub is_qos: bool, + pub is_lowlatency: bool, #[cfg(feature = "transport_multilink")] pub max_links: usize, #[cfg(feature = "shared-memory")] @@ -98,6 +98,7 @@ pub struct TransportManagerBuilderUnicast { pub(super) is_shm: bool, #[cfg(feature = "transport_auth")] pub(super) authenticator: Auth, + pub(super) is_lowlatency: bool, } impl TransportManagerBuilderUnicast { @@ -131,6 +132,11 @@ impl TransportManagerBuilderUnicast { self } + pub fn lowlatency(mut self, is_lowlatency: bool) -> Self { + self.is_lowlatency = is_lowlatency; + self + } + #[cfg(feature = "transport_multilink")] pub fn max_links(mut self, max_links: usize) -> Self { self.max_links = max_links; @@ -166,6 +172,7 @@ impl TransportManagerBuilderUnicast { self = self.accept_pending(*config.transport().unicast().accept_pending()); self = self.max_sessions(*config.transport().unicast().max_sessions()); self = self.qos(*config.transport().qos().enabled()); + self = self.lowlatency(*config.transport().unicast().lowlatency()); #[cfg(feature = "transport_multilink")] { @@ -187,6 +194,10 @@ impl TransportManagerBuilderUnicast { self, #[allow(unused)] prng: &mut PseudoRng, // Required for #[cfg(feature = "transport_multilink")] ) -> ZResult { + if self.is_qos && self.is_lowlatency { + bail!("'qos' and 'lowlatency' options are incompatible"); + } + let config = TransportManagerConfigUnicast { lease: self.lease, keep_alive: self.keep_alive, @@ -200,6 +211,7 @@ impl TransportManagerBuilderUnicast { is_shm: self.is_shm, #[cfg(all(feature = "unstable", feature = "transport_compression"))] is_compressed: self.is_compressed, + is_lowlatency: self.is_lowlatency, }; let state = TransportManagerStateUnicast { @@ -241,6 +253,7 @@ impl Default for TransportManagerBuilderUnicast { is_shm: *shm.enabled(), #[cfg(feature = "transport_auth")] authenticator: Auth::default(), + is_lowlatency: *transport.lowlatency(), } } } @@ -431,51 +444,26 @@ impl TransportManager { let is_multilink = zcondfeat!("transport_multilink", config.multilink.is_some(), false); - let stc = TransportConfigUnicast { - zid: config.zid, - whatami: config.whatami, - sn_resolution: config.sn_resolution, - tx_initial_sn: config.tx_initial_sn, - is_qos: config.is_qos, - #[cfg(feature = "transport_multilink")] - multilink: config.multilink, - #[cfg(feature = "shared-memory")] - is_shm: config.is_shm, - }; - - async fn make_net_transport( - manager: &TransportManager, - stc: TransportConfigUnicast, - link: LinkUnicast, - direction: LinkUnicastDirection, - ) -> Result, (Error, Option)> { - log::debug!("Will use NET transport!"); - let t: Arc = - TransportUnicastNet::make(manager.clone(), stc) - .map_err(|e| (e, Some(close::reason::INVALID))) - .map(|v| Arc::new(v) as Arc)?; - // Add the link to the transport - t.add_link(link, direction) - .await - .map_err(|e| (e, Some(close::reason::MAX_LINKS)))?; - Ok(t) - } - // select and create transport implementation depending on the cfg and enabled features - let a_t: Arc = zcondfeat!( - "shared-memory", - { - if stc.is_shm { - log::debug!("Will use SHM transport!"); - TransportUnicastShm::make(self.clone(), stc, link) + let a_t = { + if config.is_lowlatency { + log::debug!("Will use LowLatency transport!"); + TransportUnicastLowlatency::make(self.clone(), config.clone(), link) + .map_err(|e| (e, Some(close::reason::INVALID))) + .map(|v| Arc::new(v) as Arc)? + } else { + log::debug!("Will use Universal transport!"); + let t: Arc = + TransportUnicastUniversal::make(self.clone(), config.clone()) .map_err(|e| (e, Some(close::reason::INVALID))) - .map(|v| Arc::new(v) as Arc)? - } else { - make_net_transport(self, stc, link, direction).await? - } - }, - make_net_transport(self, stc, link, direction).await? - ); + .map(|v| Arc::new(v) as Arc)?; + // Add the link to the transport + t.add_link(link, direction) + .await + .map_err(|e| (e, Some(close::reason::MAX_LINKS)))?; + t + } + }; // Add the transport transport to the list of active transports let transport = TransportUnicast(Arc::downgrade(&a_t)); @@ -485,7 +473,7 @@ impl TransportManager { "shared-memory", { log::debug!( - "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {}, shm: {}, multilink: {}", + "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {}, shm: {}, multilink: {}, lowlatency: {}", self.config.zid, config.zid, config.whatami, @@ -493,19 +481,21 @@ impl TransportManager { config.tx_initial_sn, config.is_qos, config.is_shm, - is_multilink + is_multilink, + config.is_lowlatency ); }, { log::debug!( - "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {}, multilink: {}", + "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {}, multilink: {}, lowlatency: {}", self.config.zid, config.zid, config.whatami, config.sn_resolution, config.tx_initial_sn, config.is_qos, - is_multilink + is_multilink, + config.is_lowlatency ); } ); diff --git a/io/zenoh-transport/src/unicast/mod.rs b/io/zenoh-transport/src/unicast/mod.rs index c7f6038bbe..d2a14a0276 100644 --- a/io/zenoh-transport/src/unicast/mod.rs +++ b/io/zenoh-transport/src/unicast/mod.rs @@ -12,17 +12,16 @@ // ZettaScale Zenoh Team, // pub mod establishment; +pub(crate) mod lowlatency; pub(crate) mod manager; -pub(crate) mod net; pub(crate) mod transport_unicast_inner; +pub(crate) mod universal; #[cfg(feature = "test")] pub mod test_helpers; #[cfg(feature = "shared-memory")] pub(crate) mod shared_memory_unicast; -#[cfg(feature = "shared-memory")] -pub(crate) mod shm; use self::transport_unicast_inner::TransportUnicastTrait; @@ -55,6 +54,7 @@ pub(crate) struct TransportConfigUnicast { pub(crate) multilink: Option, #[cfg(feature = "shared-memory")] pub(crate) is_shm: bool, + pub(crate) is_lowlatency: bool, } /// [`TransportUnicast`] is the transport handler returned diff --git a/io/zenoh-transport/src/unicast/test_helpers.rs b/io/zenoh-transport/src/unicast/test_helpers.rs index 8e054e977a..403384c851 100644 --- a/io/zenoh-transport/src/unicast/test_helpers.rs +++ b/io/zenoh-transport/src/unicast/test_helpers.rs @@ -18,11 +18,13 @@ use crate::{TransportManager, TransportManagerBuilderUnicast}; pub fn make_transport_manager_builder( #[cfg(feature = "transport_multilink")] max_links: usize, - #[cfg(feature = "shared-memory")] shm_transport: bool, + #[cfg(feature = "shared-memory")] with_shm: bool, + lowlatency_transport: bool, ) -> TransportManagerBuilderUnicast { let transport = make_basic_transport_manager_builder( #[cfg(feature = "shared-memory")] - shm_transport, + with_shm, + lowlatency_transport, ); zcondfeat!( @@ -36,15 +38,23 @@ pub fn make_transport_manager_builder( } pub fn make_basic_transport_manager_builder( - #[cfg(feature = "shared-memory")] shm_transport: bool, + #[cfg(feature = "shared-memory")] with_shm: bool, + lowlatency_transport: bool, ) -> TransportManagerBuilderUnicast { println!("Create transport manager builder..."); - zcondfeat!( + let config = zcondfeat!( "shared-memory", { println!("...with SHM..."); - TransportManager::config_unicast().shm(shm_transport) + TransportManager::config_unicast().shm(with_shm) }, TransportManager::config_unicast() - ) + ); + if lowlatency_transport { + println!("...with LowLatency transport..."); + } + match lowlatency_transport { + true => config.lowlatency(true).qos(false), + false => config, + } } diff --git a/io/zenoh-transport/src/unicast/net/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs similarity index 98% rename from io/zenoh-transport/src/unicast/net/link.rs rename to io/zenoh-transport/src/unicast/universal/link.rs index 144f07db1d..1128e8c2f9 100644 --- a/io/zenoh-transport/src/unicast/net/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -use super::transport::TransportUnicastNet; +use super::transport::TransportUnicastUniversal; use crate::common::pipeline::{ TransmissionPipeline, TransmissionPipelineConf, TransmissionPipelineConsumer, TransmissionPipelineProducer, @@ -64,7 +64,7 @@ pub(super) struct TransportLinkUnicast { // The transmission pipeline pub(super) pipeline: Option, // The transport this link is associated to - transport: TransportUnicastNet, + transport: TransportUnicastUniversal, // The signals to stop TX/RX tasks handle_tx: Option>>, signal_rx: Signal, @@ -73,7 +73,7 @@ pub(super) struct TransportLinkUnicast { impl TransportLinkUnicast { pub(super) fn new( - transport: TransportUnicastNet, + transport: TransportUnicastUniversal, link: LinkUnicast, direction: LinkUnicastDirection, ) -> TransportLinkUnicast { @@ -278,7 +278,7 @@ async fn tx_task( async fn rx_task_stream( link: LinkUnicast, - transport: TransportUnicastNet, + transport: TransportUnicastUniversal, lease: Duration, signal: Signal, rx_batch_size: BatchSize, @@ -349,7 +349,7 @@ async fn rx_task_stream( async fn rx_task_dgram( link: LinkUnicast, - transport: TransportUnicastNet, + transport: TransportUnicastUniversal, lease: Duration, signal: Signal, rx_batch_size: BatchSize, @@ -421,7 +421,7 @@ async fn rx_task_dgram( async fn rx_task( link: LinkUnicast, - transport: TransportUnicastNet, + transport: TransportUnicastUniversal, lease: Duration, signal: Signal, rx_batch_size: u16, diff --git a/io/zenoh-transport/src/unicast/shm/mod.rs b/io/zenoh-transport/src/unicast/universal/mod.rs similarity index 100% rename from io/zenoh-transport/src/unicast/shm/mod.rs rename to io/zenoh-transport/src/unicast/universal/mod.rs diff --git a/io/zenoh-transport/src/unicast/net/reliability.rs b/io/zenoh-transport/src/unicast/universal/reliability.rs similarity index 100% rename from io/zenoh-transport/src/unicast/net/reliability.rs rename to io/zenoh-transport/src/unicast/universal/reliability.rs diff --git a/io/zenoh-transport/src/unicast/net/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs similarity index 98% rename from io/zenoh-transport/src/unicast/net/rx.rs rename to io/zenoh-transport/src/unicast/universal/rx.rs index 2ca7406a89..5822b09931 100644 --- a/io/zenoh-transport/src/unicast/net/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -13,7 +13,7 @@ use crate::transport_unicast_inner::TransportUnicastTrait; // Contributors: // ZettaScale Zenoh Team, // -use super::transport::TransportUnicastNet; +use super::transport::TransportUnicastUniversal; use crate::common::priority::TransportChannelRx; use async_std::task; use std::sync::MutexGuard; @@ -34,7 +34,7 @@ use zenoh_result::{bail, zerror, ZResult}; /*************************************/ /* TRANSPORT RX */ /*************************************/ -impl TransportUnicastNet { +impl TransportUnicastUniversal { fn trigger_callback( &self, #[allow(unused_mut)] // shared-memory feature requires mut diff --git a/io/zenoh-transport/src/unicast/net/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs similarity index 97% rename from io/zenoh-transport/src/unicast/net/transport.rs rename to io/zenoh-transport/src/unicast/universal/transport.rs index 95919a5f3a..5a9d115494 100644 --- a/io/zenoh-transport/src/unicast/net/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -15,7 +15,7 @@ use crate::common::priority::{TransportPriorityRx, TransportPriorityTx}; #[cfg(feature = "stats")] use crate::stats::TransportStats; use crate::transport_unicast_inner::TransportUnicastTrait; -use crate::unicast::net::link::TransportLinkUnicast; +use crate::unicast::universal::link::TransportLinkUnicast; use crate::TransportConfigUnicast; use crate::{TransportExecutor, TransportManager, TransportPeerEventHandler}; use async_std::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard}; @@ -51,10 +51,10 @@ macro_rules! zlinkindex { } /*************************************/ -/* TRANSPORT */ +/* UNIVERSAL TRANSPORT */ /*************************************/ #[derive(Clone)] -pub(crate) struct TransportUnicastNet { +pub(crate) struct TransportUnicastUniversal { // Transport Manager pub(crate) manager: TransportManager, // Transport config @@ -74,11 +74,11 @@ pub(crate) struct TransportUnicastNet { pub(super) stats: Arc, } -impl TransportUnicastNet { +impl TransportUnicastUniversal { pub fn make( manager: TransportManager, config: TransportConfigUnicast, - ) -> ZResult { + ) -> ZResult { let mut priority_tx = vec![]; let mut priority_rx = vec![]; @@ -102,7 +102,7 @@ impl TransportUnicastNet { c.sync(initial_sn)?; } - let t = TransportUnicastNet { + let t = TransportUnicastUniversal { manager, config, priority_tx: priority_tx.into_boxed_slice().into(), @@ -239,7 +239,7 @@ impl TransportUnicastNet { } #[async_trait] -impl TransportUnicastTrait for TransportUnicastNet { +impl TransportUnicastTrait for TransportUnicastUniversal { /*************************************/ /* LINK */ /*************************************/ diff --git a/io/zenoh-transport/src/unicast/net/tx.rs b/io/zenoh-transport/src/unicast/universal/tx.rs similarity index 97% rename from io/zenoh-transport/src/unicast/net/tx.rs rename to io/zenoh-transport/src/unicast/universal/tx.rs index 36e67f1f18..7dbc5329e6 100644 --- a/io/zenoh-transport/src/unicast/net/tx.rs +++ b/io/zenoh-transport/src/unicast/universal/tx.rs @@ -11,11 +11,11 @@ // Contributors: // ZettaScale Zenoh Team, // -use super::transport::TransportUnicastNet; +use super::transport::TransportUnicastUniversal; use zenoh_core::zread; use zenoh_protocol::network::NetworkMessage; -impl TransportUnicastNet { +impl TransportUnicastUniversal { fn schedule_on_link(&self, msg: NetworkMessage) -> bool { macro_rules! zpush { ($guard:expr, $pipeline:expr, $msg:expr) => { diff --git a/io/zenoh-transport/tests/endpoints.rs b/io/zenoh-transport/tests/endpoints.rs index 089f8cc7f9..e372e9e013 100644 --- a/io/zenoh-transport/tests/endpoints.rs +++ b/io/zenoh-transport/tests/endpoints.rs @@ -179,9 +179,9 @@ fn endpoint_ws() { task::block_on(run(&endpoints)); } -#[cfg(feature = "transport_shm")] +#[cfg(feature = "transport_unixpipe")] #[test] -fn endpoint_shm() { +fn endpoint_unixpipe() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); @@ -189,10 +189,10 @@ fn endpoint_shm() { // Define the locators let endpoints: Vec = vec![ - "shm/endpoint_shm".parse().unwrap(), - "shm/endpoint_shm2".parse().unwrap(), - "shm/endpoint_shm3".parse().unwrap(), - "shm/endpoint_shm4".parse().unwrap(), + "unixpipe/endpoint_unixpipe".parse().unwrap(), + "unixpipe/endpoint_unixpipe2".parse().unwrap(), + "unixpipe/endpoint_unixpipe3".parse().unwrap(), + "unixpipe/endpoint_unixpipe4".parse().unwrap(), ]; task::block_on(run(&endpoints)); } diff --git a/io/zenoh-transport/tests/transport_whitelist.rs b/io/zenoh-transport/tests/transport_whitelist.rs index 338cf12429..5279dcff21 100644 --- a/io/zenoh-transport/tests/transport_whitelist.rs +++ b/io/zenoh-transport/tests/transport_whitelist.rs @@ -138,10 +138,10 @@ fn transport_whitelist_tcp() { task::block_on(run(&endpoints)); } -#[cfg(feature = "transport_shm")] +#[cfg(feature = "transport_unixpipe")] #[test] #[ignore] -fn transport_whitelist_shm() { +fn transport_whitelist_unixpipe() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); @@ -149,8 +149,8 @@ fn transport_whitelist_shm() { // Define the locators let endpoints: Vec = vec![ - "shm/transport_whitelist_shm".parse().unwrap(), - "shm/transport_whitelist_shm2".parse().unwrap(), + "unixpipe/transport_whitelist_unixpipe".parse().unwrap(), + "unixpipe/transport_whitelist_unixpipe2".parse().unwrap(), ]; // Run task::block_on(run(&endpoints)); diff --git a/io/zenoh-transport/tests/unicast_authenticator.rs b/io/zenoh-transport/tests/unicast_authenticator.rs index 33b35d52d2..b22d7875fd 100644 --- a/io/zenoh-transport/tests/unicast_authenticator.rs +++ b/io/zenoh-transport/tests/unicast_authenticator.rs @@ -107,7 +107,7 @@ impl TransportEventHandler for SHClientAuthenticator { } #[cfg(feature = "auth_pubkey")] -async fn auth_pubkey(endpoint: &EndPoint, #[cfg(feature = "shared-memory")] shm_transport: bool) { +async fn auth_pubkey(endpoint: &EndPoint, lowlatency_transport: bool) { use rsa::{BigUint, RsaPrivateKey, RsaPublicKey}; use zenoh_transport::test_helpers::make_basic_transport_manager_builder; use zenoh_transport::unicast::establishment::ext::auth::AuthPubKey; @@ -161,7 +161,8 @@ async fn auth_pubkey(endpoint: &EndPoint, #[cfg(feature = "shared-memory")] shm_ ))); let unicast = make_basic_transport_manager_builder( #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ) .authenticator(auth); let client01_manager = TransportManager::builder() @@ -219,7 +220,8 @@ async fn auth_pubkey(endpoint: &EndPoint, #[cfg(feature = "shared-memory")] shm_ ))); let unicast = make_basic_transport_manager_builder( #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ) .authenticator(auth); let client02_manager = TransportManager::builder() @@ -238,7 +240,8 @@ async fn auth_pubkey(endpoint: &EndPoint, #[cfg(feature = "shared-memory")] shm_ ))); let unicast = make_basic_transport_manager_builder( #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ) .authenticator(auth); let client03_manager = TransportManager::builder() @@ -298,7 +301,8 @@ async fn auth_pubkey(endpoint: &EndPoint, #[cfg(feature = "shared-memory")] shm_ auth.set_pubkey(Some(auth_pubkey)); let unicast = make_basic_transport_manager_builder( #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ) .authenticator(auth); let router_manager = TransportManager::builder() @@ -406,7 +410,7 @@ async fn auth_pubkey(endpoint: &EndPoint, #[cfg(feature = "shared-memory")] shm_ } #[cfg(feature = "auth_usrpwd")] -async fn auth_usrpwd(endpoint: &EndPoint, #[cfg(feature = "shared-memory")] shm_transport: bool) { +async fn auth_usrpwd(endpoint: &EndPoint, lowlatency_transport: bool) { use zenoh_transport::test_helpers::make_basic_transport_manager_builder; use zenoh_transport::unicast::establishment::ext::auth::AuthUsrPwd; use zenoh_transport::TransportManager; @@ -442,7 +446,8 @@ async fn auth_usrpwd(endpoint: &EndPoint, #[cfg(feature = "shared-memory")] shm_ let unicast = make_basic_transport_manager_builder( #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ) .authenticator(auth_router); let router_manager = TransportManager::builder() @@ -459,7 +464,8 @@ async fn auth_usrpwd(endpoint: &EndPoint, #[cfg(feature = "shared-memory")] shm_ auth_client01.set_usrpwd(Some(auth_usrpwdr_client01)); let unicast = make_basic_transport_manager_builder( #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ) .authenticator(auth_client01); let client01_manager = TransportManager::builder() @@ -476,7 +482,8 @@ async fn auth_usrpwd(endpoint: &EndPoint, #[cfg(feature = "shared-memory")] shm_ auth_client02.set_usrpwd(Some(auth_usrpwdr_client02)); let unicast = make_basic_transport_manager_builder( #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ) .authenticator(auth_client02); let client02_manager = TransportManager::builder() @@ -493,7 +500,8 @@ async fn auth_usrpwd(endpoint: &EndPoint, #[cfg(feature = "shared-memory")] shm_ auth_client03.set_usrpwd(Some(auth_usrpwdr_client03)); let unicast = make_basic_transport_manager_builder( #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ) .authenticator(auth_client03); let client03_manager = TransportManager::builder() @@ -609,34 +617,18 @@ async fn auth_usrpwd(endpoint: &EndPoint, #[cfg(feature = "shared-memory")] shm_ task::sleep(SLEEP).await; } -async fn run(endpoint: &EndPoint, #[cfg(feature = "shared-memory")] shm_transport: bool) { +async fn run(endpoint: &EndPoint, lowlatency_transport: bool) { #[cfg(feature = "auth_pubkey")] - auth_pubkey( - endpoint, - #[cfg(feature = "shared-memory")] - shm_transport, - ) - .await; + auth_pubkey(endpoint, lowlatency_transport).await; #[cfg(feature = "auth_usrpwd")] - auth_usrpwd( - endpoint, - #[cfg(feature = "shared-memory")] - shm_transport, - ) - .await; + auth_usrpwd(endpoint, lowlatency_transport).await; } -async fn run_with_net(endpoint: &EndPoint) { - run( - endpoint, - #[cfg(feature = "shared-memory")] - false, - ) - .await +async fn run_with_universal_transport(endpoint: &EndPoint) { + run(endpoint, false).await } -#[cfg(feature = "shared-memory")] -async fn run_with_shm(endpoint: &EndPoint) { +async fn run_with_lowlatency_transport(endpoint: &EndPoint) { run(endpoint, true).await } @@ -649,19 +641,19 @@ fn authenticator_tcp() { }); let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 8000).parse().unwrap(); - task::block_on(run_with_net(&endpoint)); + task::block_on(run_with_universal_transport(&endpoint)); } -#[cfg(all(feature = "transport_tcp", feature = "shared-memory"))] +#[cfg(feature = "transport_tcp")] #[test] -fn authenticator_tcp_with_shm_transport() { +fn authenticator_tcp_with_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 8100).parse().unwrap(); - task::block_on(run_with_shm(&endpoint)); + task::block_on(run_with_lowlatency_transport(&endpoint)); } #[cfg(feature = "transport_udp")] @@ -673,45 +665,47 @@ fn authenticator_udp() { }); let endpoint: EndPoint = format!("udp/127.0.0.1:{}", 8010).parse().unwrap(); - task::block_on(run_with_net(&endpoint)); + task::block_on(run_with_universal_transport(&endpoint)); } -#[cfg(all(feature = "transport_udp", feature = "shared-memory"))] +#[cfg(feature = "transport_udp")] #[test] -fn authenticator_udp_with_shm_transport() { +fn authenticator_udp_with_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); let endpoint: EndPoint = format!("udp/127.0.0.1:{}", 8110).parse().unwrap(); - task::block_on(run_with_shm(&endpoint)); + task::block_on(run_with_lowlatency_transport(&endpoint)); } -#[cfg(feature = "transport_shm")] +#[cfg(feature = "transport_unixpipe")] #[test] #[ignore] -fn authenticator_shm() { +fn authenticator_unixpipe() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); - let endpoint: EndPoint = "shm/authenticator_shm_test".parse().unwrap(); - task::block_on(run_with_net(&endpoint)); + let endpoint: EndPoint = "unixpipe/authenticator_unixpipe_test".parse().unwrap(); + task::block_on(run_with_universal_transport(&endpoint)); } -#[cfg(all(feature = "transport_shm", feature = "shared-memory"))] +#[cfg(feature = "transport_unixpipe")] #[test] #[ignore] -fn authenticator_shm_with_shm_transport() { +fn authenticator_unixpipe_with_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); - let endpoint: EndPoint = "shm/authenticator_shm_with_shm_transport".parse().unwrap(); - task::block_on(run_with_shm(&endpoint)); + let endpoint: EndPoint = "unixpipe/authenticator_unixpipe_with_lowlatency_transport" + .parse() + .unwrap(); + task::block_on(run_with_lowlatency_transport(&endpoint)); } #[cfg(feature = "transport_ws")] @@ -724,20 +718,20 @@ fn authenticator_ws() { }); let endpoint: EndPoint = format!("ws/127.0.0.1:{}", 8020).parse().unwrap(); - task::block_on(run_with_net(&endpoint)); + task::block_on(run_with_universal_transport(&endpoint)); } -#[cfg(all(feature = "transport_ws", feature = "shared-memory"))] +#[cfg(feature = "transport_ws")] #[test] #[ignore] -fn authenticator_ws_with_shm_transport() { +fn authenticator_ws_with_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); let endpoint: EndPoint = format!("ws/127.0.0.1:{}", 8120).parse().unwrap(); - task::block_on(run_with_shm(&endpoint)); + task::block_on(run_with_lowlatency_transport(&endpoint)); } #[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))] @@ -751,7 +745,7 @@ fn authenticator_unix() { let f1 = "zenoh-test-unix-socket-10.sock"; let _ = std::fs::remove_file(f1); let endpoint: EndPoint = format!("unixsock-stream/{f1}").parse().unwrap(); - task::block_on(run_with_net(&endpoint)); + task::block_on(run_with_universal_transport(&endpoint)); let _ = std::fs::remove_file(f1); let _ = std::fs::remove_file(format!("{f1}.lock")); } @@ -856,7 +850,7 @@ R+IdLiXcyIkg0m9N8I17p0ljCSkbrgGMD3bbePRTfg== ) .unwrap(); - task::block_on(run_with_net(&endpoint)); + task::block_on(run_with_universal_transport(&endpoint)); } #[cfg(feature = "transport_quic")] @@ -959,5 +953,5 @@ R+IdLiXcyIkg0m9N8I17p0ljCSkbrgGMD3bbePRTfg== ) .unwrap(); - task::block_on(run_with_net(&endpoint)); + task::block_on(run_with_universal_transport(&endpoint)); } diff --git a/io/zenoh-transport/tests/unicast_concurrent.rs b/io/zenoh-transport/tests/unicast_concurrent.rs index 77cc7eadf2..11f5e46ca7 100644 --- a/io/zenoh-transport/tests/unicast_concurrent.rs +++ b/io/zenoh-transport/tests/unicast_concurrent.rs @@ -420,34 +420,34 @@ fn transport_ws_concurrent() { }); } -#[cfg(feature = "transport_shm")] +#[cfg(feature = "transport_unixpipe")] #[test] #[ignore] -fn transport_shm_concurrent() { +fn transport_unixpipe_concurrent() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); let endpoint01: Vec = vec![ - "shm/transport_shm_concurrent".parse().unwrap(), - "shm/transport_shm_concurrent2".parse().unwrap(), - "shm/transport_shm_concurrent3".parse().unwrap(), - "shm/transport_shm_concurrent4".parse().unwrap(), - "shm/transport_shm_concurrent5".parse().unwrap(), - "shm/transport_shm_concurrent6".parse().unwrap(), - "shm/transport_shm_concurrent7".parse().unwrap(), - "shm/transport_shm_concurrent8".parse().unwrap(), + "unixpipe/transport_unixpipe_concurrent".parse().unwrap(), + "unixpipe/transport_unixpipe_concurrent2".parse().unwrap(), + "unixpipe/transport_unixpipe_concurrent3".parse().unwrap(), + "unixpipe/transport_unixpipe_concurrent4".parse().unwrap(), + "unixpipe/transport_unixpipe_concurrent5".parse().unwrap(), + "unixpipe/transport_unixpipe_concurrent6".parse().unwrap(), + "unixpipe/transport_unixpipe_concurrent7".parse().unwrap(), + "unixpipe/transport_unixpipe_concurrent8".parse().unwrap(), ]; let endpoint02: Vec = vec![ - "shm/transport_shm_concurrent9".parse().unwrap(), - "shm/transport_shm_concurrent10".parse().unwrap(), - "shm/transport_shm_concurrent11".parse().unwrap(), - "shm/transport_shm_concurrent12".parse().unwrap(), - "shm/transport_shm_concurrent13".parse().unwrap(), - "shm/transport_shm_concurrent14".parse().unwrap(), - "shm/transport_shm_concurrent15".parse().unwrap(), - "shm/transport_shm_concurrent16".parse().unwrap(), + "unixpipe/transport_unixpipe_concurrent9".parse().unwrap(), + "unixpipe/transport_unixpipe_concurrent10".parse().unwrap(), + "unixpipe/transport_unixpipe_concurrent11".parse().unwrap(), + "unixpipe/transport_unixpipe_concurrent12".parse().unwrap(), + "unixpipe/transport_unixpipe_concurrent13".parse().unwrap(), + "unixpipe/transport_unixpipe_concurrent14".parse().unwrap(), + "unixpipe/transport_unixpipe_concurrent15".parse().unwrap(), + "unixpipe/transport_unixpipe_concurrent16".parse().unwrap(), ]; task::block_on(async { diff --git a/io/zenoh-transport/tests/unicast_defragmentation.rs b/io/zenoh-transport/tests/unicast_defragmentation.rs index d85a0c021e..43229921b0 100644 --- a/io/zenoh-transport/tests/unicast_defragmentation.rs +++ b/io/zenoh-transport/tests/unicast_defragmentation.rs @@ -209,17 +209,17 @@ fn transport_unicast_defragmentation_ws_only() { }); } -#[cfg(feature = "transport_shm")] +#[cfg(feature = "transport_unixpipe")] #[test] #[ignore] -fn transport_unicast_defragmentation_shm_only() { +fn transport_unicast_defragmentation_unixpipe_only() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); // Define the locators - let endpoint: EndPoint = "shm/transport_unicast_defragmentation_shm_only" + let endpoint: EndPoint = "unixpipe/transport_unicast_defragmentation_unixpipe_only" .parse() .unwrap(); // Define the reliability and congestion control diff --git a/io/zenoh-transport/tests/unicast_intermittent.rs b/io/zenoh-transport/tests/unicast_intermittent.rs index 81f7020d29..01ee0e3751 100644 --- a/io/zenoh-transport/tests/unicast_intermittent.rs +++ b/io/zenoh-transport/tests/unicast_intermittent.rs @@ -147,10 +147,7 @@ impl TransportPeerEventHandler for SCClient { } } -async fn transport_intermittent( - endpoint: &EndPoint, - #[cfg(feature = "shared-memory")] shm_transport: bool, -) { +async fn transport_intermittent(endpoint: &EndPoint, lowlatency_transport: bool) { /* [ROUTER] */ let router_id = ZenohId::try_from([1]).unwrap(); @@ -160,7 +157,8 @@ async fn transport_intermittent( #[cfg(feature = "transport_multilink")] 1, #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ) .max_sessions(3); let router_manager = TransportManager::builder() @@ -181,7 +179,8 @@ async fn transport_intermittent( #[cfg(feature = "transport_multilink")] 1, #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ) .max_sessions(3); let client01_manager = TransportManager::builder() @@ -196,7 +195,8 @@ async fn transport_intermittent( #[cfg(feature = "transport_multilink")] 1, #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ) .max_sessions(1); let client02_manager = TransportManager::builder() @@ -211,7 +211,8 @@ async fn transport_intermittent( #[cfg(feature = "transport_multilink")] 1, #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ) .max_sessions(1); let client03_manager = TransportManager::builder() @@ -406,17 +407,11 @@ async fn transport_intermittent( task::sleep(SLEEP).await; } -async fn net_transport_intermittent(endpoint: &EndPoint) { - transport_intermittent( - endpoint, - #[cfg(feature = "shared-memory")] - false, - ) - .await +async fn universal_transport_intermittent(endpoint: &EndPoint) { + transport_intermittent(endpoint, false).await } -#[cfg(feature = "shared-memory")] -async fn shm_transport_intermittent(endpoint: &EndPoint) { +async fn lowlatency_transport_intermittent(endpoint: &EndPoint) { transport_intermittent(endpoint, true).await } @@ -429,19 +424,19 @@ fn transport_tcp_intermittent() { }); let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 12000).parse().unwrap(); - task::block_on(net_transport_intermittent(&endpoint)); + task::block_on(universal_transport_intermittent(&endpoint)); } -#[cfg(all(feature = "transport_tcp", feature = "shared-memory"))] +#[cfg(feature = "transport_tcp")] #[test] -fn transport_tcp_intermittent_for_shm_transport() { +fn transport_tcp_intermittent_for_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 12100).parse().unwrap(); - task::block_on(shm_transport_intermittent(&endpoint)); + task::block_on(lowlatency_transport_intermittent(&endpoint)); } #[cfg(feature = "transport_ws")] @@ -454,46 +449,46 @@ fn transport_ws_intermittent() { }); let endpoint: EndPoint = format!("ws/127.0.0.1:{}", 12010).parse().unwrap(); - task::block_on(net_transport_intermittent(&endpoint)); + task::block_on(universal_transport_intermittent(&endpoint)); } -#[cfg(all(feature = "transport_ws", feature = "shared-memory"))] +#[cfg(feature = "transport_ws")] #[test] #[ignore] -fn transport_ws_intermittent_for_shm_transport() { +fn transport_ws_intermittent_for_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); let endpoint: EndPoint = format!("ws/127.0.0.1:{}", 12110).parse().unwrap(); - task::block_on(shm_transport_intermittent(&endpoint)); + task::block_on(lowlatency_transport_intermittent(&endpoint)); } -#[cfg(feature = "transport_shm")] +#[cfg(feature = "transport_unixpipe")] #[test] #[ignore] -fn transport_shm_intermittent() { +fn transport_unixpipe_intermittent() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); - let endpoint: EndPoint = "shm/transport_shm_intermittent".parse().unwrap(); - task::block_on(net_transport_intermittent(&endpoint)); + let endpoint: EndPoint = "unixpipe/transport_unixpipe_intermittent".parse().unwrap(); + task::block_on(universal_transport_intermittent(&endpoint)); } -#[cfg(all(feature = "transport_shm", feature = "shared-memory"))] +#[cfg(feature = "transport_unixpipe")] #[test] #[ignore] -fn transport_shm_intermittent_for_shm_transport() { +fn transport_unixpipe_intermittent_for_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); - let endpoint: EndPoint = "shm/transport_shm_intermittent_for_shm_transport" + let endpoint: EndPoint = "unixpipe/transport_unixpipe_intermittent_for_lowlatency_transport" .parse() .unwrap(); - task::block_on(shm_transport_intermittent(&endpoint)); + task::block_on(lowlatency_transport_intermittent(&endpoint)); } diff --git a/io/zenoh-transport/tests/unicast_multilink.rs b/io/zenoh-transport/tests/unicast_multilink.rs index fcfb6cd492..182408f75b 100644 --- a/io/zenoh-transport/tests/unicast_multilink.rs +++ b/io/zenoh-transport/tests/unicast_multilink.rs @@ -518,16 +518,16 @@ mod tests { task::block_on(multilink_transport(&endpoint)); } - #[cfg(feature = "transport_shm")] + #[cfg(feature = "transport_unixpipe")] #[test] #[ignore] - fn multilink_shm_only() { + fn multilink_unixpipe_only() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); - let endpoint: EndPoint = "shm/multilink_shm_only".parse().unwrap(); + let endpoint: EndPoint = "unixpipe/multilink_unixpipe_only".parse().unwrap(); task::block_on(multilink_transport(&endpoint)); } diff --git a/io/zenoh-transport/tests/unicast_openclose.rs b/io/zenoh-transport/tests/unicast_openclose.rs index b992a747a3..f361f6f684 100644 --- a/io/zenoh-transport/tests/unicast_openclose.rs +++ b/io/zenoh-transport/tests/unicast_openclose.rs @@ -13,7 +13,7 @@ // use async_std::{prelude::FutureExt, task}; use std::{convert::TryFrom, sync::Arc, time::Duration}; -use zenoh_core::{zasync_executor_init, zcondfeat}; +use zenoh_core::zasync_executor_init; use zenoh_link::EndPoint; use zenoh_protocol::core::{WhatAmI, ZenohId}; use zenoh_result::ZResult; @@ -79,10 +79,7 @@ impl TransportEventHandler for SHClientOpenClose { } } -async fn openclose_transport( - endpoint: &EndPoint, - #[cfg(feature = "shared-memory")] shm_transport: bool, -) { +async fn openclose_transport(endpoint: &EndPoint, lowlatency_transport: bool) { /* [ROUTER] */ let router_id = ZenohId::try_from([1]).unwrap(); @@ -92,7 +89,8 @@ async fn openclose_transport( #[cfg(feature = "transport_multilink")] 2, #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ) .max_sessions(1); let router_manager = TransportManager::builder() @@ -111,7 +109,8 @@ async fn openclose_transport( #[cfg(feature = "transport_multilink")] 2, #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ) .max_sessions(1); let client01_manager = TransportManager::builder() @@ -126,7 +125,8 @@ async fn openclose_transport( #[cfg(feature = "transport_multilink")] 1, #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ) .max_sessions(1); let client02_manager = TransportManager::builder() @@ -189,8 +189,8 @@ async fn openclose_transport( /* [2] */ // Open a second transport from the client to the router // -> This should be accepted - // (this stage is ignored for SHM transport, because it supports only one link) - if zcondfeat!("shared-memory", !shm_transport, true) { + // (this stage is ignored for LowLatency transport, because it supports only one link) + if !lowlatency_transport { links_num = 2; println!("\nTransport Open Close [2a1]"); @@ -227,7 +227,7 @@ async fn openclose_transport( } }); } else { - println!("\nTransport Open Close [2a*]: step ignored for SHM transport!"); + println!("\nTransport Open Close [2a*]: step ignored for LowLatency transport!"); } /* [3] */ @@ -455,17 +455,11 @@ async fn openclose_transport( task::sleep(SLEEP).await; } -async fn openclose_net_transport(endpoint: &EndPoint) { - openclose_transport( - endpoint, - #[cfg(feature = "shared-memory")] - false, - ) - .await +async fn openclose_universal_transport(endpoint: &EndPoint) { + openclose_transport(endpoint, false).await } -#[cfg(feature = "shared-memory")] -async fn openclose_shm_transport(endpoint: &EndPoint) { +async fn openclose_lowlatency_transport(endpoint: &EndPoint) { openclose_transport(endpoint, true).await } @@ -478,19 +472,19 @@ fn openclose_tcp_only() { }); let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 13000).parse().unwrap(); - task::block_on(openclose_net_transport(&endpoint)); + task::block_on(openclose_universal_transport(&endpoint)); } -#[cfg(all(feature = "transport_tcp", feature = "shared-memory"))] +#[cfg(feature = "transport_tcp")] #[test] -fn openclose_tcp_only_with_shm_transport() { +fn openclose_tcp_only_with_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 13100).parse().unwrap(); - task::block_on(openclose_shm_transport(&endpoint)); + task::block_on(openclose_lowlatency_transport(&endpoint)); } #[cfg(feature = "transport_udp")] @@ -502,19 +496,19 @@ fn openclose_udp_only() { }); let endpoint: EndPoint = format!("udp/127.0.0.1:{}", 13010).parse().unwrap(); - task::block_on(openclose_net_transport(&endpoint)); + task::block_on(openclose_universal_transport(&endpoint)); } -#[cfg(all(feature = "transport_udp", feature = "shared-memory"))] +#[cfg(feature = "transport_udp")] #[test] -fn openclose_udp_only_with_shm_transport() { +fn openclose_udp_only_with_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); let endpoint: EndPoint = format!("udp/127.0.0.1:{}", 13110).parse().unwrap(); - task::block_on(openclose_shm_transport(&endpoint)); + task::block_on(openclose_lowlatency_transport(&endpoint)); } #[cfg(feature = "transport_ws")] @@ -527,46 +521,48 @@ fn openclose_ws_only() { }); let endpoint: EndPoint = format!("ws/127.0.0.1:{}", 13020).parse().unwrap(); - task::block_on(openclose_net_transport(&endpoint)); + task::block_on(openclose_universal_transport(&endpoint)); } -#[cfg(all(feature = "transport_ws", feature = "shared-memory"))] +#[cfg(feature = "transport_ws")] #[test] #[ignore] -fn openclose_ws_only_with_shm_transport() { +fn openclose_ws_only_with_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); let endpoint: EndPoint = format!("ws/127.0.0.1:{}", 13120).parse().unwrap(); - task::block_on(openclose_shm_transport(&endpoint)); + task::block_on(openclose_lowlatency_transport(&endpoint)); } -#[cfg(feature = "transport_shm")] +#[cfg(feature = "transport_unixpipe")] #[test] #[ignore] -fn openclose_shm_only() { +fn openclose_unixpipe_only() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); - let endpoint: EndPoint = "shm/openclose_shm_only".parse().unwrap(); - task::block_on(openclose_net_transport(&endpoint)); + let endpoint: EndPoint = "unixpipe/openclose_unixpipe_only".parse().unwrap(); + task::block_on(openclose_universal_transport(&endpoint)); } -#[cfg(all(feature = "transport_shm", feature = "shared-memory"))] +#[cfg(feature = "transport_unixpipe")] #[test] #[ignore] -fn openclose_shm_only_with_shm_transport() { +fn openclose_unixpipe_only_with_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); - let endpoint: EndPoint = "shm/openclose_shm_only_with_shm_transport".parse().unwrap(); - task::block_on(openclose_shm_transport(&endpoint)); + let endpoint: EndPoint = "unixpipe/openclose_unixpipe_only_with_lowlatency_transport" + .parse() + .unwrap(); + task::block_on(openclose_lowlatency_transport(&endpoint)); } #[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))] @@ -581,7 +577,7 @@ fn openclose_unix_only() { let f1 = "zenoh-test-unix-socket-9.sock"; let _ = std::fs::remove_file(f1); let endpoint: EndPoint = format!("unixsock-stream/{f1}").parse().unwrap(); - task::block_on(openclose_net_transport(&endpoint)); + task::block_on(openclose_universal_transport(&endpoint)); let _ = std::fs::remove_file(f1); let _ = std::fs::remove_file(format!("{f1}.lock")); } @@ -685,7 +681,7 @@ R+IdLiXcyIkg0m9N8I17p0ljCSkbrgGMD3bbePRTfg== ) .unwrap(); - task::block_on(openclose_net_transport(&endpoint)); + task::block_on(openclose_universal_transport(&endpoint)); } #[cfg(feature = "transport_quic")] @@ -787,5 +783,5 @@ R+IdLiXcyIkg0m9N8I17p0ljCSkbrgGMD3bbePRTfg== ) .unwrap(); - task::block_on(openclose_net_transport(&endpoint)); + task::block_on(openclose_universal_transport(&endpoint)); } diff --git a/io/zenoh-transport/tests/unicast_priorities.rs b/io/zenoh-transport/tests/unicast_priorities.rs index 589cfe8e04..7d8b70b4d3 100644 --- a/io/zenoh-transport/tests/unicast_priorities.rs +++ b/io/zenoh-transport/tests/unicast_priorities.rs @@ -349,16 +349,19 @@ fn priorities_tcp_only() { task::block_on(run(&endpoints)); } -#[cfg(feature = "transport_shm")] +#[cfg(feature = "transport_unixpipe")] #[test] #[ignore] -fn conduits_shm_only() { +fn conduits_unixpipe_only() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); // Define the locators - let endpoints: Vec = vec!["shm/conduits_shm_only".to_string().parse().unwrap()]; + let endpoints: Vec = vec!["unixpipe/conduits_unixpipe_only" + .to_string() + .parse() + .unwrap()]; // Run task::block_on(run(&endpoints)); } diff --git a/io/zenoh-transport/tests/unicast_shm.rs b/io/zenoh-transport/tests/unicast_shm.rs index f0448d5a4d..59fc1467cf 100644 --- a/io/zenoh-transport/tests/unicast_shm.rs +++ b/io/zenoh-transport/tests/unicast_shm.rs @@ -149,7 +149,7 @@ mod tests { } } - async fn run(endpoint: &EndPoint) { + async fn run(endpoint: &EndPoint, lowlatency_transport: bool) { println!("Transport SHM [0a]: {endpoint:?}"); // Define client and router IDs @@ -167,7 +167,12 @@ mod tests { let peer_shm01_manager = TransportManager::builder() .whatami(WhatAmI::Peer) .zid(peer_shm01) - .unicast(TransportManager::config_unicast().shm(true)) + .unicast( + TransportManager::config_unicast() + .shm(true) + .lowlatency(lowlatency_transport) + .qos(!lowlatency_transport), + ) .build(peer_shm01_handler.clone()) .unwrap(); @@ -176,7 +181,12 @@ mod tests { let peer_shm02_manager = TransportManager::builder() .whatami(WhatAmI::Peer) .zid(peer_shm02) - .unicast(TransportManager::config_unicast().shm(true)) + .unicast( + TransportManager::config_unicast() + .shm(true) + .lowlatency(lowlatency_transport) + .qos(!lowlatency_transport), + ) .build(peer_shm02_handler.clone()) .unwrap(); @@ -185,7 +195,12 @@ mod tests { let peer_net01_manager = TransportManager::builder() .whatami(WhatAmI::Peer) .zid(peer_net01) - .unicast(TransportManager::config_unicast().shm(false)) + .unicast( + TransportManager::config_unicast() + .shm(false) + .lowlatency(lowlatency_transport) + .qos(!lowlatency_transport), + ) .build(peer_net01_handler.clone()) .unwrap(); @@ -354,7 +369,7 @@ mod tests { task::sleep(SLEEP).await; } - #[cfg(all(feature = "transport_tcp", feature = "shared-memory"))] + #[cfg(feature = "transport_tcp")] #[test] fn transport_tcp_shm() { let _ = env_logger::try_init(); @@ -363,10 +378,22 @@ mod tests { }); let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 14000).parse().unwrap(); - task::block_on(run(&endpoint)); + task::block_on(run(&endpoint, false)); } - #[cfg(all(feature = "transport_ws", feature = "shared-memory"))] + #[cfg(feature = "transport_tcp")] + #[test] + fn transport_tcp_shm_with_lowlatency_transport() { + let _ = env_logger::try_init(); + task::block_on(async { + zasync_executor_init!(); + }); + + let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 14001).parse().unwrap(); + task::block_on(run(&endpoint, true)); + } + + #[cfg(feature = "transport_ws")] #[test] fn transport_ws_shm() { let _ = env_logger::try_init(); @@ -375,18 +402,44 @@ mod tests { }); let endpoint: EndPoint = format!("ws/127.0.0.1:{}", 14010).parse().unwrap(); - task::block_on(run(&endpoint)); + task::block_on(run(&endpoint, false)); } - #[cfg(all(feature = "transport_shm", feature = "shared-memory"))] + #[cfg(feature = "transport_ws")] #[test] - fn transport_shm_shm() { + fn transport_ws_shm_with_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); - let endpoint: EndPoint = "shm/transport_shm_shm".parse().unwrap(); - task::block_on(run(&endpoint)); + let endpoint: EndPoint = format!("ws/127.0.0.1:{}", 14011).parse().unwrap(); + task::block_on(run(&endpoint, true)); + } + + #[cfg(feature = "transport_unixpipe")] + #[test] + fn transport_unixpipe_shm() { + let _ = env_logger::try_init(); + task::block_on(async { + zasync_executor_init!(); + }); + + let endpoint: EndPoint = "unixpipe/transport_unixpipe_shm".parse().unwrap(); + task::block_on(run(&endpoint, false)); + } + + #[cfg(feature = "transport_unixpipe")] + #[test] + fn transport_unixpipe_shm_with_lowlatency_transport() { + let _ = env_logger::try_init(); + task::block_on(async { + zasync_executor_init!(); + }); + + let endpoint: EndPoint = "unixpipe/transport_unixpipe_shm_with_lowlatency_transport" + .parse() + .unwrap(); + task::block_on(run(&endpoint, true)); } } diff --git a/io/zenoh-transport/tests/unicast_simultaneous.rs b/io/zenoh-transport/tests/unicast_simultaneous.rs index f3304606e3..3de47aba03 100644 --- a/io/zenoh-transport/tests/unicast_simultaneous.rs +++ b/io/zenoh-transport/tests/unicast_simultaneous.rs @@ -329,26 +329,26 @@ mod tests { }); } - #[cfg(feature = "transport_shm")] + #[cfg(feature = "transport_unixpipe")] #[test] #[ignore] - fn transport_shm_simultaneous() { + fn transport_unixpipe_simultaneous() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); let endpoint01: Vec = vec![ - "shm/transport_shm_simultaneous".parse().unwrap(), - "shm/transport_shm_simultaneous2".parse().unwrap(), - "shm/transport_shm_simultaneous3".parse().unwrap(), - "shm/transport_shm_simultaneous4".parse().unwrap(), + "unixpipe/transport_unixpipe_simultaneous".parse().unwrap(), + "unixpipe/transport_unixpipe_simultaneous2".parse().unwrap(), + "unixpipe/transport_unixpipe_simultaneous3".parse().unwrap(), + "unixpipe/transport_unixpipe_simultaneous4".parse().unwrap(), ]; let endpoint02: Vec = vec![ - "shm/transport_shm_simultaneous5".parse().unwrap(), - "shm/transport_shm_simultaneous6".parse().unwrap(), - "shm/transport_shm_simultaneous7".parse().unwrap(), - "shm/transport_shm_simultaneous8".parse().unwrap(), + "unixpipe/transport_unixpipe_simultaneous5".parse().unwrap(), + "unixpipe/transport_unixpipe_simultaneous6".parse().unwrap(), + "unixpipe/transport_unixpipe_simultaneous7".parse().unwrap(), + "unixpipe/transport_unixpipe_simultaneous8".parse().unwrap(), ]; task::block_on(async { diff --git a/io/zenoh-transport/tests/unicast_transport.rs b/io/zenoh-transport/tests/unicast_transport.rs index 25f9e8c530..e01d9d0130 100644 --- a/io/zenoh-transport/tests/unicast_transport.rs +++ b/io/zenoh-transport/tests/unicast_transport.rs @@ -221,8 +221,7 @@ const SLEEP_COUNT: Duration = Duration::from_millis(10); const MSG_COUNT: usize = 1_000; const MSG_SIZE_ALL: [usize; 2] = [1_024, 131_072]; -#[cfg(feature = "shared-memory")] -const MSG_SIZE_SHM: [usize; 2] = [1_024, 65000]; +const MSG_SIZE_LOWLATENCY: [usize; 2] = [1_024, 65000]; const MSG_SIZE_NOFRAG: [usize; 1] = [1_024]; macro_rules! ztimeout { @@ -338,7 +337,7 @@ impl TransportPeerEventHandler for SCClient { async fn open_transport_unicast( client_endpoints: &[EndPoint], server_endpoints: &[EndPoint], - #[cfg(feature = "shared-memory")] shm_transport: bool, + lowlatency_transport: bool, ) -> ( TransportManager, Arc, @@ -355,7 +354,8 @@ async fn open_transport_unicast( #[cfg(feature = "transport_multilink")] server_endpoints.len(), #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ); let router_manager = TransportManager::builder() .zid(router_id) @@ -375,7 +375,8 @@ async fn open_transport_unicast( #[cfg(feature = "transport_multilink")] client_endpoints.len(), #[cfg(feature = "shared-memory")] - shm_transport, + false, + lowlatency_transport, ); let client_manager = TransportManager::builder() .whatami(WhatAmI::Client) @@ -512,7 +513,7 @@ async fn run_single( server_endpoints: &[EndPoint], channel: Channel, msg_size: usize, - #[cfg(feature = "shared-memory")] shm_transport: bool, + lowlatency_transport: bool, ) { println!( "\n>>> Running test for: {:?}, {:?}, {:?}, {}", @@ -521,13 +522,7 @@ async fn run_single( #[allow(unused_variables)] // Used when stats feature is enabled let (router_manager, router_handler, client_manager, client_transport) = - open_transport_unicast( - client_endpoints, - server_endpoints, - #[cfg(feature = "shared-memory")] - shm_transport, - ) - .await; + open_transport_unicast(client_endpoints, server_endpoints, lowlatency_transport).await; test_transport( router_handler.clone(), @@ -565,7 +560,7 @@ async fn run_internal( server_endpoints: &[EndPoint], channel: &[Channel], msg_size: &[usize], - #[cfg(feature = "shared-memory")] shm_transport: bool, + lowlatency_transport: bool, ) { for ch in channel.iter() { for ms in msg_size.iter() { @@ -574,40 +569,30 @@ async fn run_internal( server_endpoints, *ch, *ms, - #[cfg(feature = "shared-memory")] - shm_transport, + lowlatency_transport, ) .await; } } } -async fn run_with_net( +async fn run_with_universal_transport( client_endpoints: &[EndPoint], server_endpoints: &[EndPoint], channel: &[Channel], msg_size: &[usize], ) { - run_internal( - client_endpoints, - server_endpoints, - channel, - msg_size, - #[cfg(feature = "shared-memory")] - false, - ) - .await; + run_internal(client_endpoints, server_endpoints, channel, msg_size, false).await; } -#[cfg(feature = "shared-memory")] -async fn run_with_shm( +async fn run_with_lowlatency_transport( client_endpoints: &[EndPoint], server_endpoints: &[EndPoint], channel: &[Channel], msg_size: &[usize], ) { if client_endpoints.len() > 1 || server_endpoints.len() > 1 { - println!("SHM transport doesn't support more than one link, so this test would produce MAX_LINKS error!"); + println!("LowLatency transport doesn't support more than one link, so this test would produce MAX_LINKS error!"); panic!(); } run_internal(client_endpoints, server_endpoints, channel, msg_size, true).await; @@ -638,7 +623,7 @@ fn transport_unicast_tcp_only() { }, ]; // Run - task::block_on(run_with_net( + task::block_on(run_with_universal_transport( &endpoints, &endpoints, &channel, @@ -646,9 +631,9 @@ fn transport_unicast_tcp_only() { )); } -#[cfg(all(feature = "transport_tcp", feature = "shared-memory",))] +#[cfg(feature = "transport_tcp")] #[test] -fn transport_unicast_tcp_only_with_shm() { +fn transport_unicast_tcp_only_with_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); @@ -668,11 +653,11 @@ fn transport_unicast_tcp_only_with_shm() { }, ]; // Run - task::block_on(run_with_shm( + task::block_on(run_with_lowlatency_transport( &endpoints, &endpoints, &channel, - &MSG_SIZE_SHM, + &MSG_SIZE_LOWLATENCY, )); } @@ -701,7 +686,7 @@ fn transport_unicast_udp_only() { }, ]; // Run - task::block_on(run_with_net( + task::block_on(run_with_universal_transport( &endpoints, &endpoints, &channel, @@ -709,9 +694,9 @@ fn transport_unicast_udp_only() { )); } -#[cfg(all(feature = "transport_udp", feature = "shared-memory",))] +#[cfg(feature = "transport_udp")] #[test] -fn transport_unicast_udp_only_with_shm() { +fn transport_unicast_udp_only_with_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); @@ -731,7 +716,7 @@ fn transport_unicast_udp_only_with_shm() { }, ]; // Run - task::block_on(run_with_shm( + task::block_on(run_with_lowlatency_transport( &endpoints, &endpoints, &channel, @@ -763,7 +748,7 @@ fn transport_unicast_unix_only() { }, ]; // Run - task::block_on(run_with_net( + task::block_on(run_with_universal_transport( &endpoints, &endpoints, &channel, @@ -773,19 +758,15 @@ fn transport_unicast_unix_only() { let _ = std::fs::remove_file(format!("{f1}.lock")); } -#[cfg(all( - feature = "transport_unixsock-stream", - feature = "shared-memory", - target_family = "unix" -))] +#[cfg(all(feature = "transport_unixsock-stream", target_family = "unix"))] #[test] -fn transport_unicast_unix_only_with_shm() { +fn transport_unicast_unix_only_with_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); - let f1 = "zenoh-test-unix-socket-5-shm.sock"; + let f1 = "zenoh-test-unix-socket-5-lowlatency.sock"; let _ = std::fs::remove_file(f1); // Define the locator let endpoints: Vec = vec![format!("unixsock-stream/{f1}").parse().unwrap()]; @@ -801,11 +782,11 @@ fn transport_unicast_unix_only_with_shm() { }, ]; // Run - task::block_on(run_with_shm( + task::block_on(run_with_lowlatency_transport( &endpoints, &endpoints, &channel, - &MSG_SIZE_SHM, + &MSG_SIZE_LOWLATENCY, )); let _ = std::fs::remove_file(f1); let _ = std::fs::remove_file(format!("{f1}.lock")); @@ -844,7 +825,7 @@ fn transport_unicast_ws_only() { }, ]; // Run - task::block_on(run_with_net( + task::block_on(run_with_universal_transport( &endpoints, &endpoints, &channel, @@ -852,9 +833,9 @@ fn transport_unicast_ws_only() { )); } -#[cfg(all(feature = "transport_ws", feature = "shared-memory",))] +#[cfg(feature = "transport_ws")] #[test] -fn transport_unicast_ws_only_with_shm() { +fn transport_unicast_ws_only_with_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); @@ -882,17 +863,17 @@ fn transport_unicast_ws_only_with_shm() { }, ]; // Run - task::block_on(run_with_shm( + task::block_on(run_with_lowlatency_transport( &endpoints, &endpoints, &channel, - &MSG_SIZE_SHM, + &MSG_SIZE_LOWLATENCY, )); } -#[cfg(feature = "transport_shm")] +#[cfg(feature = "transport_unixpipe")] #[test] -fn transport_unicast_shm_only() { +fn transport_unicast_unixpipe_only() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); @@ -900,8 +881,8 @@ fn transport_unicast_shm_only() { // Define the locator let endpoints: Vec = vec![ - "shm/transport_unicast_shm_only".parse().unwrap(), - "shm/transport_unicast_shm_only2".parse().unwrap(), + "unixpipe/transport_unicast_unixpipe_only".parse().unwrap(), + "unixpipe/transport_unicast_unixpipe_only2".parse().unwrap(), ]; // Define the reliability and congestion control let channel = [ @@ -915,7 +896,7 @@ fn transport_unicast_shm_only() { }, ]; // Run - task::block_on(run_with_net( + task::block_on(run_with_universal_transport( &endpoints, &endpoints, &channel, @@ -923,16 +904,20 @@ fn transport_unicast_shm_only() { )); } -#[cfg(all(feature = "transport_shm", feature = "shared-memory",))] +#[cfg(feature = "transport_unixpipe")] #[test] -fn transport_unicast_shm_only_with_shm() { +fn transport_unicast_unixpipe_only_with_lowlatency_transport() { let _ = env_logger::try_init(); task::block_on(async { zasync_executor_init!(); }); // Define the locator - let endpoints: Vec = vec!["shm/transport_unicast_shm_only_with_shm".parse().unwrap()]; + let endpoints: Vec = vec![ + "unixpipe/transport_unicast_unixpipe_only_with_lowlatency_transport" + .parse() + .unwrap(), + ]; // Define the reliability and congestion control let channel = [ Channel { @@ -945,11 +930,11 @@ fn transport_unicast_shm_only_with_shm() { }, ]; // Run - task::block_on(run_with_shm( + task::block_on(run_with_lowlatency_transport( &endpoints, &endpoints, &channel, - &MSG_SIZE_SHM, + &MSG_SIZE_LOWLATENCY, )); } @@ -980,7 +965,7 @@ fn transport_unicast_tcp_udp() { }, ]; // Run - task::block_on(run_with_net( + task::block_on(run_with_universal_transport( &endpoints, &endpoints, &channel, @@ -1020,7 +1005,7 @@ fn transport_unicast_tcp_unix() { }, ]; // Run - task::block_on(run_with_net( + task::block_on(run_with_universal_transport( &endpoints, &endpoints, &channel, @@ -1062,7 +1047,7 @@ fn transport_unicast_udp_unix() { }, ]; // Run - task::block_on(run_with_net( + task::block_on(run_with_universal_transport( &endpoints, &endpoints, &channel, @@ -1107,7 +1092,7 @@ fn transport_unicast_tcp_udp_unix() { }, ]; // Run - task::block_on(run_with_net( + task::block_on(run_with_universal_transport( &endpoints, &endpoints, &channel, @@ -1163,7 +1148,7 @@ fn transport_unicast_tls_only_server() { ]; // Run let endpoints = vec![endpoint]; - task::block_on(run_with_net( + task::block_on(run_with_universal_transport( &endpoints, &endpoints, &channel, @@ -1217,7 +1202,7 @@ fn transport_unicast_quic_only_server() { ]; // Run let endpoints = vec![endpoint]; - task::block_on(run_with_net( + task::block_on(run_with_universal_transport( &endpoints, &endpoints, &channel, @@ -1289,7 +1274,7 @@ fn transport_unicast_tls_only_mutual_success() { // Run let client_endpoints = vec![client_endpoint]; let server_endpoints = vec![server_endpoint]; - task::block_on(run_with_net( + task::block_on(run_with_universal_transport( &client_endpoints, &server_endpoints, &channel, @@ -1368,7 +1353,7 @@ fn transport_unicast_tls_only_mutual_no_client_certs_failure() { let client_endpoints = vec![client_endpoint]; let server_endpoints = vec![server_endpoint]; let result = std::panic::catch_unwind(|| { - task::block_on(run_with_net( + task::block_on(run_with_universal_transport( &client_endpoints, &server_endpoints, &channel, @@ -1450,7 +1435,7 @@ fn transport_unicast_tls_only_mutual_wrong_client_certs_failure() { let client_endpoints = vec![client_endpoint]; let server_endpoints = vec![server_endpoint]; let result = std::panic::catch_unwind(|| { - task::block_on(run_with_net( + task::block_on(run_with_universal_transport( &client_endpoints, &server_endpoints, &channel, @@ -1462,3 +1447,56 @@ fn transport_unicast_tls_only_mutual_wrong_client_certs_failure() { let error_msg = panic_message::panic_message(&err); assert!(error_msg.contains(RUSTLS_UNKNOWN_CA_ALERT_DESCRIPTION)); } + +#[test] +fn transport_unicast_qos_and_lowlatency_failure() { + struct TestPeer; + impl TransportEventHandler for TestPeer { + fn new_unicast( + &self, + _: TransportPeer, + _: TransportUnicast, + ) -> ZResult> { + panic!(); + } + + fn new_multicast( + &self, + _: TransportMulticast, + ) -> ZResult> { + panic!(); + } + } + + let peer_shm02_handler = Arc::new(TestPeer); + + let failing_manager = TransportManager::builder() + .whatami(WhatAmI::Peer) + .unicast( + TransportManager::config_unicast() + .lowlatency(true) + .qos(true), + ) + .build(peer_shm02_handler.clone()); + assert!(failing_manager.is_err()); + + let good_manager1 = TransportManager::builder() + .whatami(WhatAmI::Peer) + .unicast( + TransportManager::config_unicast() + .lowlatency(false) + .qos(true), + ) + .build(peer_shm02_handler.clone()); + assert!(good_manager1.is_ok()); + + let good_manager2 = TransportManager::builder() + .whatami(WhatAmI::Peer) + .unicast( + TransportManager::config_unicast() + .lowlatency(true) + .qos(false), + ) + .build(peer_shm02_handler.clone()); + assert!(good_manager2.is_ok()); +} diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index e6d5c04f60..1fa34eab32 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -41,7 +41,7 @@ stats = ["zenoh-transport/stats", "zenoh-protocol/stats"] transport_multilink = ["zenoh-transport/transport_multilink"] transport_quic = ["zenoh-transport/transport_quic"] transport_serial = ["zenoh-transport/transport_serial"] -transport_shm = ["zenoh-transport/transport_shm"] +transport_unixpipe = ["zenoh-transport/transport_unixpipe"] transport_tcp = ["zenoh-transport/transport_tcp"] transport_tls = ["zenoh-transport/transport_tls"] transport_udp = ["zenoh-transport/transport_udp"]