diff --git a/.gitignore b/.gitignore index bf5a1656d3..105dae1aa7 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,4 @@ cargo-timing*.html -#ignore test data -testfiles ci/valgrind-check/*.log diff --git a/Cargo.lock b/Cargo.lock index 91ad98ce8c..aff6c4950a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -235,45 +235,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" -[[package]] -name = "asn1-rs" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ad1373757efa0f70ec53939aabc7152e1591cb485208052993070ac8d2429d" -dependencies = [ - "asn1-rs-derive", - "asn1-rs-impl", - "displaydoc", - "nom", - "num-traits", - "rusticata-macros", - "thiserror", - "time 0.3.28", -] - -[[package]] -name = "asn1-rs-derive" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7378575ff571966e99a744addeff0bff98b8ada0dedf1956d59e634db95eaac1" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.52", - "synstructure", -] - -[[package]] -name = "asn1-rs-impl" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.52", -] - [[package]] name = "async-attributes" version = "1.1.2" @@ -1043,20 +1004,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "der-parser" -version = "9.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cd0a5c643689626bec213c4d8bd4d96acc8ffdb4ad4bb6bc16abf27d5f4b553" -dependencies = [ - "asn1-rs", - "displaydoc", - "nom", - "num-bigint", - "num-traits", - "rusticata-macros", -] - [[package]] name = "deranged" version = "0.3.8" @@ -1135,17 +1082,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" -[[package]] -name = "displaydoc" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.52", -] - [[package]] name = "dyn-clone" version = "1.0.13" @@ -2369,15 +2305,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "oid-registry" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c958dd45046245b9c3c2547369bb634eb461670b2e7e0de552905801a648d1d" -dependencies = [ - "asn1-rs", -] - [[package]] name = "once_cell" version = "1.19.0" @@ -3222,15 +3149,6 @@ dependencies = [ "semver 1.0.18", ] -[[package]] -name = "rusticata-macros" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632" -dependencies = [ - "nom", -] - [[package]] name = "rustix" version = "0.37.25" @@ -4033,17 +3951,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" -[[package]] -name = "synstructure" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.52", -] - [[package]] name = "system-configuration" version = "0.5.1" @@ -4166,7 +4073,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48" dependencies = [ "deranged", - "itoa", "serde", "time-core", "time-macros 0.2.14", @@ -5142,23 +5048,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "x509-parser" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcbc162f30700d6f3f82a24bf7cc62ffe7caea42c0b2cba8bf7f3ae50cf51f69" -dependencies = [ - "asn1-rs", - "data-encoding", - "der-parser", - "lazy_static", - "nom", - "oid-registry", - "rusticata-macros", - "thiserror", - "time 0.3.28", -] - [[package]] name = "yasna" version = "0.5.2" @@ -5461,7 +5350,6 @@ dependencies = [ "tokio-util", "tracing", "webpki-roots", - "x509-parser", "zenoh-collections", "zenoh-config", "zenoh-core", @@ -5528,7 +5416,6 @@ dependencies = [ "tokio-util", "tracing", "webpki-roots", - "x509-parser", "zenoh-collections", "zenoh-config", "zenoh-core", diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 97c72ce579..e57660800f 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -104,8 +104,6 @@ pub struct DownsamplingItemConf { #[derive(Serialize, Debug, Deserialize, Clone)] pub struct AclConfigRules { pub interfaces: Option>, - pub cert_common_names: Option>, - pub usernames: Option>, pub key_exprs: Vec, pub actions: Vec, pub flows: Option>, @@ -126,8 +124,6 @@ pub struct PolicyRule { #[serde(rename_all = "snake_case")] pub enum Subject { Interface(String), - CertCommonName(String), - Username(String), } #[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)] diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index 6b2ec14c69..5a41050e94 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -50,7 +50,6 @@ pub struct Link { pub is_reliable: bool, pub is_streamed: bool, pub interfaces: Vec, - pub auth_identifier: LinkAuthId, } #[async_trait] @@ -79,7 +78,6 @@ impl From<&LinkUnicast> for Link { is_reliable: link.is_reliable(), is_streamed: link.is_streamed(), interfaces: link.get_interface_names(), - auth_identifier: link.get_auth_identifier(), } } } @@ -100,7 +98,6 @@ impl From<&LinkMulticast> for Link { is_reliable: link.is_reliable(), is_streamed: false, interfaces: vec![], - auth_identifier: LinkAuthId::default(), } } } diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index cd8c550503..add4c3a27b 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -20,7 +20,6 @@ use core::{ use std::net::SocketAddr; use async_trait::async_trait; -use serde::Serialize; use zenoh_protocol::{ core::{EndPoint, Locator}, transport::BatchSize, @@ -52,7 +51,6 @@ pub trait LinkUnicastTrait: Send + Sync { fn is_reliable(&self) -> bool; fn is_streamed(&self) -> bool; fn get_interface_names(&self) -> Vec; - fn get_auth_identifier(&self) -> LinkAuthId; async fn write(&self, buffer: &[u8]) -> ZResult; async fn write_all(&self, buffer: &[u8]) -> ZResult<()>; async fn read(&self, buffer: &mut [u8]) -> ZResult; @@ -120,69 +118,3 @@ pub fn get_ip_interface_names(addr: &SocketAddr) -> Vec { } } } -#[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)] - -pub enum LinkAuthType { - Tls, - Quic, - None, -} -#[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)] - -pub struct LinkAuthId { - auth_type: LinkAuthType, - auth_value: Option, -} - -impl LinkAuthId { - pub fn get_type(&self) -> &LinkAuthType { - &self.auth_type - } - pub fn get_value(&self) -> &Option { - &self.auth_value - } -} -impl Default for LinkAuthId { - fn default() -> Self { - LinkAuthId { - auth_type: LinkAuthType::None, - auth_value: None, - } - } -} - -#[derive(Debug)] -pub struct LinkAuthIdBuilder { - pub auth_type: LinkAuthType, //HAS to be provided when building - pub auth_value: Option, //actual value added to the above type; is None for None type -} -impl Default for LinkAuthIdBuilder { - fn default() -> Self { - Self::new() - } -} - -impl LinkAuthIdBuilder { - pub fn new() -> LinkAuthIdBuilder { - LinkAuthIdBuilder { - auth_type: LinkAuthType::None, - auth_value: None, - } - } - - pub fn auth_type(&mut self, auth_type: LinkAuthType) -> &mut Self { - self.auth_type = auth_type; - self - } - pub fn auth_value(&mut self, auth_value: Option) -> &mut Self { - self.auth_value = auth_value; - self - } - - pub fn build(&self) -> LinkAuthId { - LinkAuthId { - auth_type: self.auth_type.clone(), - auth_value: self.auth_value.clone(), - } - } -} diff --git a/io/zenoh-links/zenoh-link-quic/Cargo.toml b/io/zenoh-links/zenoh-link-quic/Cargo.toml index e10eed71a1..63bfc1f839 100644 --- a/io/zenoh-links/zenoh-link-quic/Cargo.toml +++ b/io/zenoh-links/zenoh-link-quic/Cargo.toml @@ -30,14 +30,13 @@ base64 = { workspace = true } futures = { workspace = true } quinn = { workspace = true } rustls-native-certs = { workspace = true } -rustls-pki-types = { workspace = true } +rustls-pki-types = { workspace = true } rustls-webpki = { workspace = true } - secrecy = { workspace = true } tokio = { workspace = true, features = [ + "fs", "io-util", "net", - "fs", "sync", "time", ] } @@ -57,5 +56,3 @@ zenoh-util = { workspace = true } rustls = { version = "0.21", features = ["dangerous_configuration", "quic"] } tokio-rustls = "0.24.1" rustls-pemfile = { version = "1" } - -x509-parser = "0.16.0" diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index cd9cad071f..a3b2687b6f 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -22,11 +22,10 @@ use std::{ use async_trait::async_trait; use tokio::sync::Mutex as AsyncMutex; use tokio_util::sync::CancellationToken; -use x509_parser::prelude::*; use zenoh_core::zasynclock; use zenoh_link_commons::{ - get_ip_interface_names, LinkAuthId, LinkAuthIdBuilder, LinkAuthType, LinkManagerUnicastTrait, - LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, + get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, + ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -47,7 +46,6 @@ pub struct LinkUnicastQuic { dst_locator: Locator, send: AsyncMutex, recv: AsyncMutex, - auth_identifier: LinkAuthId, } impl LinkUnicastQuic { @@ -57,7 +55,6 @@ impl LinkUnicastQuic { dst_locator: Locator, send: quinn::SendStream, recv: quinn::RecvStream, - auth_identifier: LinkAuthId, ) -> LinkUnicastQuic { // Build the Quic object LinkUnicastQuic { @@ -67,7 +64,6 @@ impl LinkUnicastQuic { dst_locator, send: AsyncMutex::new(send), recv: AsyncMutex::new(recv), - auth_identifier, } } } @@ -160,10 +156,6 @@ impl LinkUnicastTrait for LinkUnicastQuic { fn is_streamed(&self) -> bool { true } - #[inline(always)] - fn get_auth_identifier(&self) -> LinkAuthId { - self.auth_identifier.clone() - } } impl Drop for LinkUnicastQuic { @@ -262,7 +254,6 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { .open_bi() .await .map_err(|e| zerror!("Can not create a new QUIC link bound to {}: {}", host, e))?; - let auth_id = get_cert_common_name(quic_conn.clone())?; let link = Arc::new(LinkUnicastQuic::new( quic_conn, @@ -270,7 +261,6 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { endpoint.into(), send, recv, - auth_id.into(), )); Ok(LinkUnicast(link)) @@ -398,15 +388,12 @@ async fn accept_task( let dst_addr = quic_conn.remote_address(); tracing::debug!("Accepted QUIC connection on {:?}: {:?}", src_addr, dst_addr); // Create the new link object - let auth_id = get_cert_common_name(quic_conn.clone())?; - let link = Arc::new(LinkUnicastQuic::new( quic_conn, src_addr, Locator::new(QUIC_LOCATOR_PREFIX, dst_addr.to_string(), "")?, send, recv, - auth_id.into() )); // Communicate the new link to the initial transport manager @@ -431,36 +418,3 @@ async fn accept_task( } Ok(()) } - -fn get_cert_common_name(conn: quinn::Connection) -> ZResult { - let mut auth_id = QuicAuthId { auth_value: None }; - if let Some(pi) = conn.peer_identity() { - let serv_certs = pi.downcast::>().unwrap(); - if let Some(item) = serv_certs.iter().next() { - let (_, cert) = X509Certificate::from_der(item.as_ref()).unwrap(); - let subject_name = cert - .subject - .iter_common_name() - .next() - .and_then(|cn| cn.as_str().ok()) - .unwrap(); - auth_id = QuicAuthId { - auth_value: Some(subject_name.to_string()), - }; - } - } - Ok(auth_id) -} - -#[derive(Debug, Clone)] -struct QuicAuthId { - auth_value: Option, -} -impl From for LinkAuthId { - fn from(value: QuicAuthId) -> Self { - LinkAuthIdBuilder::new() - .auth_type(LinkAuthType::Quic) - .auth_value(value.auth_value.clone()) - .build() - } -} diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 31213f5c43..ca4efacdc6 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -32,8 +32,8 @@ use tokio_util::sync::CancellationToken; use z_serial::ZSerial; use zenoh_core::{zasynclock, zasyncread, zasyncwrite}; use zenoh_link_commons::{ - ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, NewLinkChannelSender, + ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, + NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -212,10 +212,6 @@ impl LinkUnicastTrait for LinkUnicastSerial { fn is_streamed(&self) -> bool { false } - #[inline(always)] - fn get_auth_identifier(&self) -> LinkAuthId { - LinkAuthId::default() - } } impl fmt::Display for LinkUnicastSerial { diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index bf2e66c863..79812c526e 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -20,7 +20,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use zenoh_link_commons::{ - get_ip_interface_names, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, + get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, }; use zenoh_protocol::{ @@ -164,9 +164,6 @@ impl LinkUnicastTrait for LinkUnicastTcp { fn is_streamed(&self) -> bool { true } - fn get_auth_identifier(&self) -> LinkAuthId { - LinkAuthId::default() - } } // // WARN: This sometimes causes timeout in routing test diff --git a/io/zenoh-links/zenoh-link-tls/Cargo.toml b/io/zenoh-links/zenoh-link-tls/Cargo.toml index 00f7207bb0..3025e3d7d7 100644 --- a/io/zenoh-links/zenoh-link-tls/Cargo.toml +++ b/io/zenoh-links/zenoh-link-tls/Cargo.toml @@ -47,5 +47,3 @@ zenoh-result = { workspace = true } zenoh-runtime = { workspace = true } zenoh-sync = { workspace = true } zenoh-util = { workspace = true } - -x509-parser = "0.16.0" diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 2e40f23dae..1ced1a26b1 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -21,12 +21,10 @@ use tokio::{ }; use tokio_rustls::{TlsAcceptor, TlsConnector, TlsStream}; use tokio_util::sync::CancellationToken; -//use webpki::anchor_from_trusted_cert; -use x509_parser::prelude::*; use zenoh_core::zasynclock; use zenoh_link_commons::{ - get_ip_interface_names, LinkAuthId, LinkAuthIdBuilder, LinkAuthType, LinkManagerUnicastTrait, - LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, + get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, + ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -39,10 +37,6 @@ use crate::{ TLS_ACCEPT_THROTTLE_TIME, TLS_DEFAULT_MTU, TLS_LINGER_TIMEOUT, TLS_LOCATOR_PREFIX, }; -#[derive(Default, Debug, PartialEq, Eq, Hash)] -pub struct TlsCommonName(String); - -//impl pub struct LinkUnicastTls { // The underlying socket as returned from the async-rustls library // NOTE: TlsStream requires &mut for read and write operations. This means @@ -62,7 +56,6 @@ pub struct LinkUnicastTls { // Make sure there are no concurrent read or writes write_mtx: AsyncMutex<()>, read_mtx: AsyncMutex<()>, - auth_identifier: LinkAuthId, } unsafe impl Send for LinkUnicastTls {} @@ -73,7 +66,6 @@ impl LinkUnicastTls { socket: TlsStream, src_addr: SocketAddr, dst_addr: SocketAddr, - auth_identifier: LinkAuthId, ) -> LinkUnicastTls { let (tcp_stream, _) = socket.get_ref(); // Set the TLS nodelay option @@ -107,7 +99,6 @@ impl LinkUnicastTls { dst_locator: Locator::new(TLS_LOCATOR_PREFIX, dst_addr.to_string(), "").unwrap(), write_mtx: AsyncMutex::new(()), read_mtx: AsyncMutex::new(()), - auth_identifier, } } @@ -198,10 +189,6 @@ impl LinkUnicastTrait for LinkUnicastTls { fn is_streamed(&self) -> bool { true } - #[inline(always)] - fn get_auth_identifier(&self) -> LinkAuthId { - self.auth_identifier.clone() - } } impl Drop for LinkUnicastTls { @@ -295,19 +282,9 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { e ) })?; - - let (_, tls_conn) = tls_stream.get_ref(); - - let auth_identifier = get_server_cert_common_name(tls_conn)?; - let tls_stream = TlsStream::Client(tls_stream); - let link = Arc::new(LinkUnicastTls::new( - tls_stream, - src_addr, - dst_addr, - auth_identifier.into(), - )); + let link = Arc::new(LinkUnicastTls::new(tls_stream, src_addr, dst_addr)); Ok(LinkUnicast(link)) } @@ -407,16 +384,8 @@ async fn accept_task( }; tracing::debug!("Accepted TLS connection on {:?}: {:?}", src_addr, dst_addr); - let (_, tls_conn) = tls_stream.get_ref(); - let auth_identifier = get_client_cert_common_name(tls_conn)?; - tracing::debug!("Accepted TLS connection on {:?}: {:?}", src_addr, dst_addr); - // Create the new link object - let link = Arc::new(LinkUnicastTls::new( - tls_stream, - src_addr, - dst_addr, - auth_identifier.into(), - )); + // Create the new link object + let link = Arc::new(LinkUnicastTls::new(tls_stream, src_addr, dst_addr)); // Communicate the new link to the initial transport manager if let Err(e) = manager.send_async(LinkUnicast(link)).await { @@ -440,55 +409,3 @@ async fn accept_task( Ok(()) } - -fn get_client_cert_common_name(tls_conn: &rustls::CommonState) -> ZResult { - if let Some(serv_certs) = tls_conn.peer_certificates() { - let (_, cert) = X509Certificate::from_der(serv_certs[0].as_ref())?; - let subject_name = &cert - .subject - .iter_common_name() - .next() - .and_then(|cn| cn.as_str().ok()) - .unwrap(); - - Ok(TlsAuthId { - auth_value: Some(subject_name.to_string()), - }) - } else { - Ok(TlsAuthId { auth_value: None }) - } -} - -fn get_server_cert_common_name(tls_conn: &rustls::ClientConnection) -> ZResult { - let serv_certs = tls_conn.peer_certificates().unwrap(); - let mut auth_id = TlsAuthId { auth_value: None }; - - //need the first certificate in the chain os no need for looping - if let Some(item) = serv_certs.iter().next() { - let (_, cert) = X509Certificate::from_der(item.as_ref())?; - let subject_name = &cert - .subject - .iter_common_name() - .next() - .and_then(|cn| cn.as_str().ok()) - .unwrap(); - - auth_id = TlsAuthId { - auth_value: Some(subject_name.to_string()), - }; - return Ok(auth_id); - } - Ok(auth_id) -} - -struct TlsAuthId { - auth_value: Option, -} -impl From for LinkAuthId { - fn from(value: TlsAuthId) -> Self { - LinkAuthIdBuilder::new() - .auth_type(LinkAuthType::Tls) - .auth_value(value.auth_value.clone()) - .build() - } -} diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 760ed2209c..79f980ca96 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -24,8 +24,8 @@ use tokio::{net::UdpSocket, sync::Mutex as AsyncMutex}; use tokio_util::sync::CancellationToken; use zenoh_core::{zasynclock, zlock}; use zenoh_link_commons::{ - get_ip_interface_names, ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, - LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, + get_ip_interface_names, ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, + LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -224,10 +224,6 @@ impl LinkUnicastTrait for LinkUnicastUdp { fn is_streamed(&self) -> bool { false } - #[inline(always)] - fn get_auth_identifier(&self) -> LinkAuthId { - LinkAuthId::default() - } } impl fmt::Display for LinkUnicastUdp { diff --git a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs index 7dea524ca1..1b30ceb553 100644 --- a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs @@ -36,8 +36,8 @@ use tokio_util::sync::CancellationToken; use unix_named_pipe::{create, open_write}; use zenoh_core::{zasyncread, zasyncwrite, ResolveFuture, Wait}; use zenoh_link_commons::{ - ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, NewLinkChannelSender, + ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, + NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -525,10 +525,6 @@ impl LinkUnicastTrait for UnicastPipe { fn is_streamed(&self) -> bool { true } - #[inline(always)] - fn get_auth_identifier(&self) -> LinkAuthId { - LinkAuthId::default() - } } impl fmt::Display for UnicastPipe { diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs index 7adbb3ab30..cc7147c9e0 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs @@ -27,7 +27,7 @@ use tokio_util::sync::CancellationToken; use uuid::Uuid; use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_link_commons::{ - LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -143,10 +143,6 @@ impl LinkUnicastTrait for LinkUnicastUnixSocketStream { fn is_streamed(&self) -> bool { true } - #[inline(always)] - fn get_auth_identifier(&self) -> LinkAuthId { - LinkAuthId::default() - } } impl Drop for LinkUnicastUnixSocketStream { diff --git a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs index 32b292ca7e..605f114173 100644 --- a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs @@ -28,7 +28,7 @@ use tokio_vsock::{ }; use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_link_commons::{ - LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, }; use zenoh_protocol::{ core::{endpoint::Address, EndPoint, Locator}, @@ -189,10 +189,6 @@ impl LinkUnicastTrait for LinkUnicastVsock { fn is_streamed(&self) -> bool { true } - #[inline(always)] - fn get_auth_identifier(&self) -> LinkAuthId { - LinkAuthId::default() - } } impl fmt::Display for LinkUnicastVsock { diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index 336e8af975..b671bf67f2 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -34,7 +34,7 @@ use tokio_tungstenite::{accept_async, tungstenite::Message, MaybeTlsStream, WebS use tokio_util::sync::CancellationToken; use zenoh_core::{zasynclock, zasyncread, zasyncwrite}; use zenoh_link_commons::{ - LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -226,10 +226,6 @@ impl LinkUnicastTrait for LinkUnicastWs { fn is_streamed(&self) -> bool { false } - #[inline(always)] - fn get_auth_identifier(&self) -> LinkAuthId { - LinkAuthId::default() - } } impl Drop for LinkUnicastWs { diff --git a/io/zenoh-transport/src/unicast/authentication.rs b/io/zenoh-transport/src/unicast/authentication.rs deleted file mode 100644 index b66289983e..0000000000 --- a/io/zenoh-transport/src/unicast/authentication.rs +++ /dev/null @@ -1,43 +0,0 @@ -use zenoh_link::{LinkAuthId, LinkAuthType}; - -#[cfg(feature = "auth_usrpwd")] -use super::establishment::ext::auth::UsrPwdId; - -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum AuthId { - CertCommonName(String), - Username(String), - None, -} - -impl From for AuthId { - fn from(lid: LinkAuthId) -> Self { - match (lid.get_type(), lid.get_value()) { - (LinkAuthType::Tls | LinkAuthType::Quic, Some(auth_value)) => { - AuthId::CertCommonName(auth_value.clone()) - } - _ => AuthId::None, - } - } -} - -#[cfg(feature = "auth_usrpwd")] -impl From for AuthId { - fn from(user_password_id: UsrPwdId) -> Self { - // pub(crate) struct UsrPwdId(pub Option>); - match user_password_id.0 { - Some(username) => { - //do something - //convert username from vecu8 to string - match std::str::from_utf8(&username) { - Ok(name) => AuthId::Username(name.to_owned()), - Err(e) => { - tracing::error!("Error in extracting username {}", e); - AuthId::None - } - } - } - None => AuthId::None, - } - } -} diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index 9a7151252d..d074ea9642 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -31,8 +31,6 @@ use zenoh_protocol::{ }; use zenoh_result::ZResult; -#[cfg(feature = "auth_usrpwd")] -use super::ext::auth::UsrPwdId; #[cfg(feature = "shared-memory")] use super::ext::shm::AuthSegment; #[cfg(feature = "shared-memory")] @@ -113,8 +111,6 @@ struct RecvOpenSynOut { other_whatami: WhatAmI, other_lease: Duration, other_initial_sn: TransportSn, - #[cfg(feature = "auth_usrpwd")] - other_auth_id: UsrPwdId, } // OpenAck @@ -490,18 +486,11 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { } // Extension Auth - #[allow(unused_mut, unused_assignments)] - #[cfg(feature = "auth_usrpwd")] - let mut user_password_id = UsrPwdId(None); - - #[cfg(feature = "auth_usrpwd")] - { - user_password_id = self - .ext_auth - .recv_open_syn((&mut state.link.ext_auth, open_syn.ext_auth)) - .await - .map_err(|e| (e, Some(close::reason::GENERIC)))?; - } + #[cfg(feature = "transport_auth")] + self.ext_auth + .recv_open_syn((&mut state.link.ext_auth, open_syn.ext_auth)) + .await + .map_err(|e| (e, Some(close::reason::GENERIC)))?; // Extension MultiLink #[cfg(feature = "transport_multilink")] @@ -528,8 +517,6 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { other_whatami: cookie.whatami, other_lease: open_syn.lease, other_initial_sn: open_syn.initial_sn, - #[cfg(feature = "auth_usrpwd")] - other_auth_id: user_password_id, }; Ok((state, output)) } @@ -724,6 +711,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - cookie_nonce: iack_out.cookie_nonce, }; let (mut state, osyn_out) = step!(fsm.recv_open_syn(osyn_in).await); + // Create the OpenAck but not send it yet let oack_in = SendOpenAckIn { mine_zid: manager.config.zid, @@ -747,8 +735,6 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - false => None, }, is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(), - #[cfg(feature = "auth_usrpwd")] - auth_id: osyn_out.other_auth_id, }; let a_config = TransportLinkUnicastConfig { diff --git a/io/zenoh-transport/src/unicast/establishment/ext/auth/mod.rs b/io/zenoh-transport/src/unicast/establishment/ext/auth/mod.rs index 0bc46c6edc..8d57434bc3 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/auth/mod.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/auth/mod.rs @@ -571,12 +571,7 @@ impl<'a> AcceptFsm for &'a AuthFsm<'a> { } type RecvOpenSynIn = (&'a mut StateAccept, Option); - - #[cfg(not(feature = "auth_usrpwd"))] type RecvOpenSynOut = (); - #[cfg(feature = "auth_usrpwd")] - type RecvOpenSynOut = UsrPwdId; - async fn recv_open_syn( self, input: Self::RecvOpenSynIn, @@ -609,17 +604,13 @@ impl<'a> AcceptFsm for &'a AuthFsm<'a> { match (self.usrpwd.as_ref(), state.usrpwd.as_mut()) { (Some(e), Some(s)) => { let x = ztake!(exts, id::USRPWD); - let username = e.recv_open_syn((s, ztryinto!(x, S))).await?; - let user_passwd_id = UsrPwdId(Some(username)); - return Ok(user_passwd_id); - } - (None, None) => { - return Ok(UsrPwdId(None)); + e.recv_open_syn((s, ztryinto!(x, S))).await?; } + (None, None) => {} _ => bail!("{S} Invalid UsrPwd configuration."), } } - #[cfg(not(feature = "auth_usrpwd"))] + Ok(()) } diff --git a/io/zenoh-transport/src/unicast/establishment/ext/auth/usrpwd.rs b/io/zenoh-transport/src/unicast/establishment/ext/auth/usrpwd.rs index 22d7a86817..be24337fad 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/auth/usrpwd.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/auth/usrpwd.rs @@ -162,8 +162,6 @@ impl StateOpen { pub(crate) struct StateAccept { nonce: u64, } -#[derive(Clone, Debug, PartialEq, Eq)] -pub(crate) struct UsrPwdId(pub Option>); impl StateAccept { pub(crate) fn new(prng: &mut R) -> Self @@ -408,7 +406,7 @@ impl<'a> AcceptFsm for &'a AuthUsrPwdFsm<'a> { } type RecvOpenSynIn = (&'a mut StateAccept, Option); - type RecvOpenSynOut = Vec; //value of userid is returned if recvopensynout is processed as valid + type RecvOpenSynOut = (); async fn recv_open_syn( self, input: Self::RecvOpenSynIn, @@ -438,8 +436,8 @@ impl<'a> AcceptFsm for &'a AuthUsrPwdFsm<'a> { if hmac != open_syn.hmac { bail!("{S} Invalid password."); } - let username = open_syn.user.to_owned(); - Ok(username) + + Ok(()) } type SendOpenAckIn = &'a StateAccept; diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index 2d50d465bf..49c57d9e9a 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -32,8 +32,6 @@ use zenoh_result::ZResult; use super::ext::shm::AuthSegment; #[cfg(feature = "shared-memory")] use crate::shm::TransportShmConfig; -#[cfg(feature = "auth_usrpwd")] -use crate::unicast::establishment::ext::auth::UsrPwdId; use crate::{ common::batch::BatchConfig, unicast::{ @@ -646,8 +644,6 @@ pub(crate) async fn open_link( false => None, }, is_lowlatency: state.transport.ext_lowlatency.is_lowlatency(), - #[cfg(feature = "auth_usrpwd")] - auth_id: UsrPwdId(None), }; let o_config = TransportLinkUnicastConfig { diff --git a/io/zenoh-transport/src/unicast/lowlatency/transport.rs b/io/zenoh-transport/src/unicast/lowlatency/transport.rs index abffb665b7..9c46b55174 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/transport.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/transport.rs @@ -32,7 +32,6 @@ use zenoh_result::{zerror, ZResult}; use crate::stats::TransportStats; use crate::{ unicast::{ - authentication::AuthId, link::{LinkUnicastWithOpenAck, TransportLinkUnicast}, transport_unicast_inner::{AddLinkResult, TransportUnicastTrait}, TransportConfigUnicast, @@ -188,10 +187,6 @@ impl TransportUnicastTrait for TransportUnicastLowlatency { self.config.zid } - fn get_auth_ids(&self) -> Vec { - vec![] - } - fn get_whatami(&self) -> WhatAmI { self.config.whatami } diff --git a/io/zenoh-transport/src/unicast/mod.rs b/io/zenoh-transport/src/unicast/mod.rs index 973d0bf09a..1726ba2559 100644 --- a/io/zenoh-transport/src/unicast/mod.rs +++ b/io/zenoh-transport/src/unicast/mod.rs @@ -11,16 +11,16 @@ // Contributors: // ZettaScale Zenoh Team, // -pub mod authentication; pub mod establishment; pub(crate) mod link; pub(crate) mod lowlatency; pub(crate) mod manager; -#[cfg(feature = "test")] -pub mod test_helpers; pub(crate) mod transport_unicast_inner; pub(crate) mod universal; +#[cfg(feature = "test")] +pub mod test_helpers; + use std::{ fmt, sync::{Arc, Weak}, @@ -42,9 +42,6 @@ use self::transport_unicast_inner::TransportUnicastTrait; use super::{TransportPeer, TransportPeerEventHandler}; #[cfg(feature = "shared-memory")] use crate::shm::TransportShmConfig; -use crate::unicast::authentication::AuthId; -#[cfg(feature = "auth_usrpwd")] -use crate::unicast::establishment::ext::auth::UsrPwdId; /*************************************/ /* TRANSPORT UNICAST */ @@ -61,8 +58,6 @@ pub(crate) struct TransportConfigUnicast { #[cfg(feature = "shared-memory")] pub(crate) shm: Option, pub(crate) is_lowlatency: bool, - #[cfg(feature = "auth_usrpwd")] - pub(crate) auth_id: UsrPwdId, } /// [`TransportUnicast`] is the transport handler returned @@ -122,11 +117,6 @@ impl TransportUnicast { Ok(transport.get_links()) } - pub fn get_auth_ids(&self) -> ZResult> { - let transport = self.get_inner()?; - Ok(transport.get_auth_ids()) - } - #[inline(always)] pub fn schedule(&self, message: NetworkMessage) -> ZResult<()> { let transport = self.get_inner()?; diff --git a/io/zenoh-transport/src/unicast/transport_unicast_inner.rs b/io/zenoh-transport/src/unicast/transport_unicast_inner.rs index bc0c34b7e8..c687a6aa16 100644 --- a/io/zenoh-transport/src/unicast/transport_unicast_inner.rs +++ b/io/zenoh-transport/src/unicast/transport_unicast_inner.rs @@ -56,7 +56,6 @@ pub(crate) trait TransportUnicastTrait: Send + Sync { fn get_whatami(&self) -> WhatAmI; fn get_callback(&self) -> Option>; fn get_links(&self) -> Vec; - fn get_auth_ids(&self) -> Vec; #[cfg(feature = "shared-memory")] fn is_shm(&self) -> bool; fn is_qos(&self) -> bool; diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index e7b0d52458..538756f6ee 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -28,7 +28,6 @@ use zenoh_protocol::{ }; use zenoh_result::{bail, zerror, ZResult}; -use super::super::authentication::AuthId; #[cfg(feature = "stats")] use crate::stats::TransportStats; use crate::{ @@ -382,18 +381,6 @@ impl TransportUnicastTrait for TransportUnicastUniversal { zread!(self.links).iter().map(|l| l.link.link()).collect() } - fn get_auth_ids(&self) -> Vec { - //convert link level auth ids to AuthId - #[allow(unused_mut)] - let mut auth_ids: Vec = zread!(self.links) - .iter() - .map(|l| l.link.link().auth_identifier.into()) - .collect(); - // convert usrpwd auth id to AuthId - #[cfg(feature = "auth_usrpwd")] - auth_ids.push(self.config.auth_id.clone().into()); - auth_ids - } /*************************************/ /* TX */ /*************************************/ diff --git a/zenoh/src/net/routing/interceptor/access_control.rs b/zenoh/src/net/routing/interceptor/access_control.rs index 885752e2c6..fe78ce8aed 100644 --- a/zenoh/src/net/routing/interceptor/access_control.rs +++ b/zenoh/src/net/routing/interceptor/access_control.rs @@ -26,10 +26,7 @@ use zenoh_protocol::{ zenoh::{PushBody, RequestBody}, }; use zenoh_result::ZResult; -use zenoh_transport::{ - multicast::TransportMulticast, - unicast::{authentication::AuthId, TransportUnicast}, -}; +use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast}; use super::{ authorization::PolicyEnforcer, EgressInterceptor, IngressInterceptor, InterceptorFactory, @@ -40,19 +37,18 @@ pub struct AclEnforcer { enforcer: Arc, } #[derive(Clone, Debug)] -pub struct AuthSubject { +pub struct Interface { id: usize, - name: String, //make Subject + name: String, } - struct EgressAclEnforcer { policy_enforcer: Arc, - subject: Vec, + interface_list: Vec, zid: ZenohId, } struct IngressAclEnforcer { policy_enforcer: Arc, - subject: Vec, + interface_list: Vec, zid: ZenohId, } @@ -84,29 +80,9 @@ impl InterceptorFactoryTrait for AclEnforcer { &self, transport: &TransportUnicast, ) -> (Option, Option) { - let mut authn_ids = vec![]; - if let Ok(ids) = transport.get_auth_ids() { - let enforcer = self.enforcer.clone(); - for auth_id in ids { - match auth_id { - AuthId::CertCommonName(name) => { - let subject = &Subject::CertCommonName(name.clone()); - if let Some(val) = enforcer.subject_map.get(subject) { - authn_ids.push(AuthSubject { id: *val, name }); - } - } - AuthId::Username(name) => { - let subject = &Subject::Username(name.clone()); - if let Some(val) = enforcer.subject_map.get(subject) { - authn_ids.push(AuthSubject { id: *val, name }); - } - } - AuthId::None => {} - } - } - } match transport.get_zid() { Ok(zid) => { + let mut interface_list: Vec = Vec::new(); match transport.get_links() { Ok(links) => { for link in links { @@ -114,7 +90,7 @@ impl InterceptorFactoryTrait for AclEnforcer { for face in link.interfaces { let subject = &Subject::Interface(face.clone()); if let Some(val) = enforcer.subject_map.get(subject) { - authn_ids.push(AuthSubject { + interface_list.push(Interface { id: *val, name: face, }); @@ -129,13 +105,13 @@ impl InterceptorFactoryTrait for AclEnforcer { } let ingress_interceptor = Box::new(IngressAclEnforcer { policy_enforcer: self.enforcer.clone(), + interface_list: interface_list.clone(), zid, - subject: authn_ids.clone(), }); let egress_interceptor = Box::new(EgressAclEnforcer { policy_enforcer: self.enforcer.clone(), + interface_list: interface_list.clone(), zid, - subject: authn_ids, }); match ( self.enforcer.interface_enabled.ingress, @@ -306,15 +282,15 @@ impl InterceptorTrait for EgressAclEnforcer { } pub trait AclActionMethods { fn policy_enforcer(&self) -> Arc; + fn interface_list(&self) -> Vec; fn zid(&self) -> ZenohId; fn flow(&self) -> InterceptorFlow; - fn authn_ids(&self) -> Vec; fn action(&self, action: Action, log_msg: &str, key_expr: &str) -> Permission { let policy_enforcer = self.policy_enforcer(); - let authn_ids: Vec = self.authn_ids(); + let interface_list = self.interface_list(); let zid = self.zid(); let mut decision = policy_enforcer.default_permission; - for subject in &authn_ids { + for subject in &interface_list { match policy_enforcer.policy_decision_point(subject.id, self.flow(), action, key_expr) { Ok(Permission::Allow) => { tracing::trace!( @@ -360,28 +336,32 @@ impl AclActionMethods for EgressAclEnforcer { fn policy_enforcer(&self) -> Arc { self.policy_enforcer.clone() } + + fn interface_list(&self) -> Vec { + self.interface_list.clone() + } + fn zid(&self) -> ZenohId { self.zid } fn flow(&self) -> InterceptorFlow { InterceptorFlow::Egress } - fn authn_ids(&self) -> Vec { - self.subject.clone() - } } impl AclActionMethods for IngressAclEnforcer { fn policy_enforcer(&self) -> Arc { self.policy_enforcer.clone() } + + fn interface_list(&self) -> Vec { + self.interface_list.clone() + } + fn zid(&self) -> ZenohId { self.zid } fn flow(&self) -> InterceptorFlow { InterceptorFlow::Ingress } - fn authn_ids(&self) -> Vec { - self.subject.clone() - } } diff --git a/zenoh/src/net/routing/interceptor/authorization.rs b/zenoh/src/net/routing/interceptor/authorization.rs index 78185c9405..4ff36b1ce3 100644 --- a/zenoh/src/net/routing/interceptor/authorization.rs +++ b/zenoh/src/net/routing/interceptor/authorization.rs @@ -177,20 +177,6 @@ impl PolicyEnforcer { ); } } - match rule.usernames { - Some(_) => (), - None => { - tracing::warn!("ACL config usernames list is empty. Applying rule #{} to all usernames", rule_offset); - rule.usernames = Some(Vec::new()); - } - } - match rule.cert_common_names { - Some(_) => (), - None => { - tracing::warn!("ACL config cert_common_names list is empty. Applying rule #{} to all certificate common names", rule_offset); - rule.cert_common_names = Some(Vec::new()); - } - } } let policy_information = self.policy_information_point(&rules)?; let subject_map = policy_information.subject_map; @@ -243,7 +229,9 @@ impl PolicyEnforcer { for config_rule in config_rule_set { // config validation let mut validation_err = String::new(); - + if config_rule.interfaces.as_ref().unwrap().is_empty() { + validation_err.push_str("ACL config interfaces list is empty. "); + } if config_rule.actions.is_empty() { validation_err.push_str("ACL config actions list is empty. "); } @@ -256,28 +244,6 @@ impl PolicyEnforcer { if !validation_err.is_empty() { bail!("{}", validation_err); } - - //for when at least one is not empty - let mut subject_validation_err: usize = 0; - validation_err = String::new(); - - if config_rule.interfaces.as_ref().unwrap().is_empty() { - subject_validation_err += 1; - validation_err.push_str("ACL config interfaces list is empty. "); - } - if config_rule.cert_common_names.as_ref().unwrap().is_empty() { - subject_validation_err += 1; - validation_err.push_str("ACL config certificate common names list is empty. "); - } - if config_rule.usernames.as_ref().unwrap().is_empty() { - subject_validation_err += 1; - validation_err.push_str("ACL config usernames list is empty. "); - } - - if subject_validation_err == 3 { - bail!("{}", validation_err); - } - for subject in config_rule.interfaces.as_ref().unwrap() { if subject.trim().is_empty() { bail!("found an empty interface value in interfaces list"); @@ -299,48 +265,6 @@ impl PolicyEnforcer { } } } - for subject in config_rule.cert_common_names.as_ref().unwrap() { - if subject.trim().is_empty() { - bail!("found an empty value in certificate common names list"); - } - for flow in config_rule.flows.as_ref().unwrap() { - for action in &config_rule.actions { - for key_expr in &config_rule.key_exprs { - if key_expr.trim().is_empty() { - bail!("found an empty key-expression value in key_exprs list"); - } - policy_rules.push(PolicyRule { - subject: Subject::CertCommonName(subject.clone()), - key_expr: key_expr.clone(), - action: *action, - permission: config_rule.permission, - flow: *flow, - }) - } - } - } - } - for subject in config_rule.usernames.as_ref().unwrap() { - if subject.trim().is_empty() { - bail!("found an empty value in usernames list"); - } - for flow in config_rule.flows.as_ref().unwrap() { - for action in &config_rule.actions { - for key_expr in &config_rule.key_exprs { - if key_expr.trim().is_empty() { - bail!("found an empty key-expression value in key_exprs list"); - } - policy_rules.push(PolicyRule { - subject: Subject::Username(subject.clone()), - key_expr: key_expr.clone(), - action: *action, - permission: config_rule.permission, - flow: *flow, - }) - } - } - } - } } let mut subject_map = SubjectMap::default(); let mut counter = 1; @@ -369,9 +293,6 @@ impl PolicyEnforcer { key_expr: &str, ) -> ZResult { let policy_map = &self.policy_map; - if policy_map.is_empty() { - return Ok(self.default_permission); - } match policy_map.get(&subject) { Some(single_policy) => { let deny_result = single_policy diff --git a/zenoh/tests/acl.rs b/zenoh/tests/acl.rs index 3aed0e6541..b78a9ac888 100644 --- a/zenoh/tests/acl.rs +++ b/zenoh/tests/acl.rs @@ -47,7 +47,7 @@ mod test { async fn get_basic_router_config() -> Config { let mut config = config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); - config.listen.endpoints = vec!["tcp/127.0.0.1:27447".parse().unwrap()]; + config.listen.endpoints = vec!["tcp/127.0.0.1:7447".parse().unwrap()]; config.scouting.multicast.set_enabled(Some(false)).unwrap(); config } @@ -59,9 +59,9 @@ mod test { async fn get_client_sessions() -> (Session, Session) { println!("Opening client sessions"); - let config = config::client(["tcp/127.0.0.1:27447".parse::().unwrap()]); + let config = config::client(["tcp/127.0.0.1:7447".parse::().unwrap()]); let s01 = ztimeout!(zenoh::open(config)).unwrap(); - let config = config::client(["tcp/127.0.0.1:27447".parse::().unwrap()]); + let config = config::client(["tcp/127.0.0.1:7447".parse::().unwrap()]); let s02 = ztimeout!(zenoh::open(config)).unwrap(); (s01, s02) } diff --git a/zenoh/tests/authentication.rs b/zenoh/tests/authentication.rs deleted file mode 100644 index e4b15d5771..0000000000 --- a/zenoh/tests/authentication.rs +++ /dev/null @@ -1,1245 +0,0 @@ -// -// Copyright (c) 2024 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -mod test { - use std::{ - fs, - path::Path, - sync::{Arc, Mutex}, - time::Duration, - }; - - use tokio::runtime::Handle; - use zenoh::{ - config, - config::{EndPoint, WhatAmI}, - prelude::*, - Config, Session, - }; - use zenoh_core::{zlock, ztimeout}; - - const TIMEOUT: Duration = Duration::from_secs(60); - const SLEEP: Duration = Duration::from_secs(1); - const KEY_EXPR: &str = "test/demo"; - const VALUE: &str = "zenoh"; - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn test_authentication() { - zenoh_util::try_init_log_from_env(); - let path = "./tests/testfiles"; - create_new_files(path).await.unwrap(); - println!("testfiles created successfully."); - - test_pub_sub_deny_then_allow_usrpswd().await; - test_pub_sub_allow_then_deny_usrpswd().await; - test_get_qbl_allow_then_deny_usrpswd().await; - test_get_qbl_deny_then_allow_usrpswd().await; - - test_pub_sub_deny_then_allow_tls(3774).await; - test_pub_sub_allow_then_deny_tls(3775).await; - test_get_qbl_allow_then_deny_tls(3776).await; - test_get_qbl_deny_then_allow_tls(3777).await; - - test_pub_sub_deny_then_allow_quic(3774).await; - test_pub_sub_allow_then_deny_quic(3775).await; - test_get_qbl_deny_then_allow_quic(3776).await; - test_get_qbl_allow_then_deny_quic(3777).await; - - std::fs::remove_dir_all(path).unwrap(); - println!("testfiles removed successfully."); - } - - #[allow(clippy::all)] - async fn create_new_files(file_path: &str) -> std::io::Result<()> { - use std::io::prelude::*; - let ca_pem = b"-----BEGIN CERTIFICATE----- -MIIDiTCCAnGgAwIBAgIUO1x6LAlICgKs5+pYUTo4CughfKEwDQYJKoZIhvcNAQEL -BQAwVDELMAkGA1UEBhMCRlIxCzAJBgNVBAgMAklGMQswCQYDVQQHDAJQUjERMA8G -A1UECgwIenMsIEluYy4xGDAWBgNVBAMMD3pzX3Rlc3Rfcm9vdF9jYTAeFw0yNDAz -MTExNDM0MjNaFw0yNTAzMTExNDM0MjNaMFQxCzAJBgNVBAYTAkZSMQswCQYDVQQI -DAJJRjELMAkGA1UEBwwCUFIxETAPBgNVBAoMCHpzLCBJbmMuMRgwFgYDVQQDDA96 -c190ZXN0X3Jvb3RfY2EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC3 -pFWM+IJNsRCYHt1v/TliecppwVZV+ZHfFw9JKN9ev4K/fWHUiAOwp91MOLxbaYKd -C6dxW28YVGltoGz3kUZJZcJRQVso1jXv24Op4muOsiYXukLc4TU2F6dG1XqkLt5t -svsYAQFf1uK3//QZFVRBosJEn+jjiJ4XCvt49mnPRolp1pNKX0z31mZO6bSly6c9 -OVlJMjWpDCYSOuf6qZZ36fa9eSut2bRJIPY0QCsgnqYBTnIEhksS+3jy6Qt+QpLz -95pFdLbW/MW4XKpaDltyYkO6QrBekF6uWRlvyAHU+NqvXZ4F/3Z5l26qLuBcsLPJ -kyawkO+yNIDxORmQgMczAgMBAAGjUzBRMB0GA1UdDgQWBBThgotd9ws2ryEEaKp2 -+RMOWV8D7jAfBgNVHSMEGDAWgBThgotd9ws2ryEEaKp2+RMOWV8D7jAPBgNVHRMB -Af8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQA9QoPv78hGmvmqF4GZeqrOBKQB -N/H5wL7f8H6BXU/wpNo2nnWOJn3u37lT+zivAdGEv+x+GeKekcugKBCSluhBLpVb -VNXe4WwMm5FBuO2NRBN2nblTMm1kEO00nVk1/yNo4hI8mj7d4YLU62d7324osNpF -wHqu6B0/c99JeKRvODGswyff1i8rJ1jpcgk/JmHg7UQBHEIkn0cRR0f9W3Mxv6b5 -ZeowRe81neWNkC6IMiMmzA0iHGkhoUMA15qG1ZKOr1XR364LH5BfNNpzAWYwkvJs -0JFrrdw+rm+cRJWs55yiyCCs7pyg1IJkY/o8bifdCOUgIyonzffwREk3+kZR ------END CERTIFICATE-----"; - - let client_side_pem = b"-----BEGIN CERTIFICATE----- -MIIDjDCCAnSgAwIBAgIUOi9jKILrOzfRNGIkQ48S90NehpkwDQYJKoZIhvcNAQEL -BQAwVDELMAkGA1UEBhMCRlIxCzAJBgNVBAgMAklGMQswCQYDVQQHDAJQUjERMA8G -A1UECgwIenMsIEluYy4xGDAWBgNVBAMMD3pzX3Rlc3Rfcm9vdF9jYTAeFw0yNDAz -MTkxMTMxNDhaFw0yNTAzMTkxMTMxNDhaMFAxCzAJBgNVBAYTAkZSMQswCQYDVQQI -DAJJRjELMAkGA1UEBwwCUFIxETAPBgNVBAoMCHpzLCBJbmMuMRQwEgYDVQQDDAtj -bGllbnRfc2lkZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMzU2p1a -ly/1bi2TDZ8+Qlvk9/3KyHqrg2BGZUxB3Pj/lufDuYNwOHkss99wp8gzMsT28mD4 -y6X7nCgEN8WeHl+/xfLuGsWIBa1OOr6dz0qewoWFsor01cQ8+nwAKlgnz6IvHfkQ -OJZD/QYSdyn6c1AcIyS60vo4qMjyI4OVb1Dl4WpC4vCmWvDT0WjBZ5GckCnuQ8wS -wZ5MtPuMQf8kYX95ll7eBtDfEXF9Oja0l1/5SmlHuKyqDy4sIKovxtFHTqgb8PUc -yT33pUHOsBXruNBxl1MKq1outdMqcQknT6FAC+aVZ7bTlwhnH8p5Apn57g+dJYTI -9dCr1e2oK5NohhkCAwEAAaNaMFgwFgYDVR0RBA8wDYILY2xpZW50X3NpZGUwHQYD -VR0OBBYEFHDUYYfQacLj1tp49OG9NbPuL0N/MB8GA1UdIwQYMBaAFOGCi133Czav -IQRoqnb5Ew5ZXwPuMA0GCSqGSIb3DQEBCwUAA4IBAQB+nFAe6QyD2AaFdgrFOyEE -MeYb97sy9p5ylhMYyU62AYsIzzpTY74wBG78qYPIw3lAYzNcN0L6T6kBQ4lu6gFm -XB0SqCZ2AkwvV8tTlbLkZeoO6rONeke6c8cJsxYN7NiknDvTMrkTTgiyvbCWfEVX -Htnc4j/KzSBX3UjVcbPM3L/6KwMRw050/6RCiOIPFjTOCfTGoDx5fIyBk3ch/Plw -TkH2juHxX0/aCxr8hRE1v9+pXXlGnGoKbsDMLN9Aziu6xzdT/kD7BvyoM8rh7CE5 -ae7/R4sd13cZ2WGDPimqO0z1kItMOIdiYvk4DgOg+J8hZSkKT56erafdDa2LPBE6 ------END CERTIFICATE-----"; - - let client_side_key = b"-----BEGIN PRIVATE KEY----- -MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDM1NqdWpcv9W4t -kw2fPkJb5Pf9ysh6q4NgRmVMQdz4/5bnw7mDcDh5LLPfcKfIMzLE9vJg+Mul+5wo -BDfFnh5fv8Xy7hrFiAWtTjq+nc9KnsKFhbKK9NXEPPp8ACpYJ8+iLx35EDiWQ/0G -Encp+nNQHCMkutL6OKjI8iODlW9Q5eFqQuLwplrw09FowWeRnJAp7kPMEsGeTLT7 -jEH/JGF/eZZe3gbQ3xFxfTo2tJdf+UppR7isqg8uLCCqL8bRR06oG/D1HMk996VB -zrAV67jQcZdTCqtaLrXTKnEJJ0+hQAvmlWe205cIZx/KeQKZ+e4PnSWEyPXQq9Xt -qCuTaIYZAgMBAAECggEAAlqVVw7UEzLjtN4eX1S6tD3jvCzFBETdjgENF7TfjlR4 -lln9UyV6Xqkc+Y28vdwZwqHwW90sEPCc5ShUQD7+jBzi8FVcZSX4o7rVCbz8RXgg -1eI5EKf632YQflWNpwTxGcTnGCY/sjleil/yst6sDdD+9eR4OXQme2Wt8wyH8pLm -bf1OensGrFu3kJaPMOfP6jXnqEqkUPqmaCNW7+Ans8E+4J9oksRVPQJEuxwSjdJu -BlG50KKpl0XwZ/u/hkkj8/BlRDa62YMGJkFOwaaGUu2/0UU139XaJiMSPoL6t/BU -1H15dtW9liEtnHIssXMRzc9cg+xPgCs79ABXSZaFUQKBgQD4mH/DcEFwkZQcr08i -GUk0RE5arAqHui4eiujcPZVV6j/L7PHHmabKRPBlsndFP7KUCtvzNRmHq7JWDkpF -S36OE4e94CBYb0CIrO8OO5zl1vGAn5qa9ckefSFz9AMWW+hSuo185hFjt67BMaI0 -8CxfYDH+QY5D4JE5RhSwsOmiUQKBgQDS7qjq+MQKPHHTztyHK8IbAfEGlrBdCAjf -K1bDX2BdfbRJMZ+y8LgK5HxDPlNx2/VauBLsIyU1Zirepd8hOsbCVoK1fOq+T7bY -KdB1oqLK1Rq1sMBc26F24LBaZ3Pw5XgYEcvaOW0JFQ9Oc4VjcIXKjTNhobNOegfK -QDnw8fEtSQKBgQDrCuTh2GVHFZ3AcVCUoOvB60NaH4flRHcOkbARbHihvtWK7gC8 -A97bJ8tTnCWA5/TkXFAR54a36/K1wtUeJ38Evhp9wEdU1ftiPn/YKSzzcwLr5fu7 -v9/kX9MdWv0ASu2iKphUGwMeETG9oDwJaXvKwZ0DFOB59P3Z9RTi6qI7wQKBgQCp -uBZ6WgeDJPeBsaSHrpHUIU/KOV1WvaxFxR1evlNPZmG1sxQIat/rA8VoZbHGn3Ff -uVSgY/cAbGB6HYTXu+9JV0p8tTI8Ru+cJqjwvhe2lJmVL87X6HCWsluzoiIL5tcm -pssbn7E36ZYTTag6RsOgItUA7ZbUwiOafOsiD8o64QKBgE6nOkAfy5mbp7X+q9uD -J5y6IXpY/Oia/RwveLWFbI/aum4Nnhb6L9Y0XlrYjm4cJOchQyDR7FF6f4EuAiYb -wdxBbkxXpwXnfKCtNvMF/wZMvPVaS5HTQga8hXMrtlW6jtTJ4HmkTTB/MILAXVkJ -EHi+N70PcrYg6li415TGfgDz ------END PRIVATE KEY-----"; - - let server_side_pem = b"-----BEGIN CERTIFICATE----- -MIIDjDCCAnSgAwIBAgIUOi9jKILrOzfRNGIkQ48S90NehpgwDQYJKoZIhvcNAQEL -BQAwVDELMAkGA1UEBhMCRlIxCzAJBgNVBAgMAklGMQswCQYDVQQHDAJQUjERMA8G -A1UECgwIenMsIEluYy4xGDAWBgNVBAMMD3pzX3Rlc3Rfcm9vdF9jYTAeFw0yNDAz -MTkxMTMxMDRaFw0yNTAzMTkxMTMxMDRaMFAxCzAJBgNVBAYTAkZSMQswCQYDVQQI -DAJJRjELMAkGA1UEBwwCUFIxETAPBgNVBAoMCHpzLCBJbmMuMRQwEgYDVQQDDAtz -ZXJ2ZXJfc2lkZTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKw4eKzt -T1inzuEIPBaPksWyjoD9n6uJx9jAQ2wRB6rXiAsXVLRSuczdGDpb1MwAqoIi6ozw -tzDRwkr58vUNaTCswxadlAmB44JEVYKZoublHjlVj5ygr0R4R5F2T9tIV+jpqZuK -HR4dHe8PiDCiWVzWvYwOLVKXQKSeaE2Z143ukVIJ85qmNykJ066AVhgWnIYSCR9c -s7WPBdTWAW3L4yNlast9hfvxdQNDs5AtUnJKfAX+7DylPAm8V7YjU1k9AtTNPbpy -kb9X97ErsB8891MmZaGZp0J6tnuucDkk0dlowMVvi2aUCsYoKF5DgGxtyVAeLhTP -70GenaLe2uwG8fMCAwEAAaNaMFgwFgYDVR0RBA8wDYILc2VydmVyX3NpZGUwHQYD -VR0OBBYEFBKms1sOw8nM/O5SN1EZIH+LsWaPMB8GA1UdIwQYMBaAFOGCi133Czav -IQRoqnb5Ew5ZXwPuMA0GCSqGSIb3DQEBCwUAA4IBAQA6H/sfm8YUn86+GwxNR9i9 -MCL7WHVRx3gS9ENK87+HtZNL2TVvhPJtupG3Pjgqi33FOHrM4rMUcWSZeCEycVgy -5cjimQLwfDljIBRQE6sem3gKf0obdWl5AlPDLTL/iKj5Su7NycrjZFYqkjZjn+58 -fe8lzHNeP/3RQTgjJ98lQI0bdzGDG1+QoxTgPEc77vgN0P4MHJYx2auz/7jYBqNJ -ko8nugIQsd4kOhmOIBUQ8aXkXFktSQIerEGB8uw5iF2cCdH/sTCvhzhxLb4IWo/O -0cAZ+Vs4FW3KUn/Y44yrVAWl1H6xdFsNXBqbzVEMzlt/RV3rH70RDCc20XhP+w+g ------END CERTIFICATE-----"; - - let server_side_key = b"-----BEGIN PRIVATE KEY----- -MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCsOHis7U9Yp87h -CDwWj5LFso6A/Z+ricfYwENsEQeq14gLF1S0UrnM3Rg6W9TMAKqCIuqM8Lcw0cJK -+fL1DWkwrMMWnZQJgeOCRFWCmaLm5R45VY+coK9EeEeRdk/bSFfo6ambih0eHR3v -D4gwollc1r2MDi1Sl0CknmhNmdeN7pFSCfOapjcpCdOugFYYFpyGEgkfXLO1jwXU -1gFty+MjZWrLfYX78XUDQ7OQLVJySnwF/uw8pTwJvFe2I1NZPQLUzT26cpG/V/ex -K7AfPPdTJmWhmadCerZ7rnA5JNHZaMDFb4tmlArGKCheQ4BsbclQHi4Uz+9Bnp2i -3trsBvHzAgMBAAECggEAUjpIS/CmkOLWYRVoczEr197QMYBnCyUm2TO7PU7IRWbR -GtKR6+MPuWPbHIoaCSlMQARhztdj8BhG1zuOKDi1/7qNDzA/rWZp9RmhZlDquamt -i5xxjEwgQuXW7fn6WO2qo5dlFtGT43vtfeYBlY7+cdhJ+iQOub9j6vWDQYHxrF7x -yM8xvNzomHThvLFzWXJV/nGjX5pqPraMmwJUW+MGX0YaEr6tClqsc1Kmxhs3iIUo -1JCqh3FpVu2i/mR9fdcQ0ONT/s1UHzy+1Bhmh3j2Fuk4+ZeLMfxTfFxk5U0BeMQY -sES3qmd+pG5iqPW+AmXy299G89jf5+1Q4J2Km5KOUQKBgQDidifoeknpi9hRHLLD -w/7KMMe8yYg3c3dv5p0iUQQ2pXd1lJIFQ+B2/D+hfOXhnN/iCDap89ll2LoQ2Q9L -38kQXH06HCM2q11RP0BEsZCG0CnluS+JVNnjs/ALi+yc4HSpzKPs3zXIC3dLOUbq -ov5Esa5h/RU6+NO+DH72TWTv6wKBgQDCryPKtOcLp1eqdwIBRoXdPZeUdZdnwT8+ -70DnC+YdOjFkqTbaoYE5ePa3ziGOZyTFhJbPgiwEdj9Ez1JSgqLLv5hBc4s6FigK -D7fOnn7Q7+al/kEW7+X5yoSl1bFuPCqGL1xxzxmpDY8Gf3nyZ+QGfWIenbk3nq12 -nTgINyWMGQKBgQDSrxBDxXl8EMGH/MYHQRGKs8UvSuMyi3bjoU4w/eSInno75qPO -yC5NJDJin9sSgar8E54fkSCBExdP01DayvC5CwLqDAFqvBTOIKU/A18tPP6tnRKv -lkQ8Bkxdwai47k07J4qeNa9IU/qA/mGOq2MZL6DHwvd8bMA5gFCh/rDYTwKBgAPm -gGASScK5Ao+evMKLyCjLkBrgVD026O542qMGYQDa5pxuq3Or4qvlGYRLM+7ncBwo -8OCNahZYzCGzyaFvjpVobEN7biGmyfyRngwcrsu+0q8mreUov0HG5etwoZJk0DFK -B58cGBaD+AaYTTgnDrF2l52naUuM+Uq0EahQeocZAoGBAMJEGUFyEdm1JATkNhBv -ruDzj07PCjdvq3lUJix2ZlKlabsi5V+oYxMmrUSU8Nkaxy6O+qETNRNWQeWbPQHL -IZx/qrP32PmWX0IVj3pbfKHQSpOKNGzL9xUJ/FIycZWyT3yGf24KBuJwIx7xSrRx -qNsoty1gY/y3n7SN/iMZo8lO ------END PRIVATE KEY-----"; - - let credentials_txt = b"client1name:client1passwd -client2name:client2passwd"; - - let certs_dir = Path::new(file_path); - if !certs_dir.exists() { - fs::create_dir(certs_dir)?; - } - struct Testfile<'a> { - name: &'a str, - value: &'a [u8], - } - - let test_files = vec![ - Testfile { - name: "ca.pem", - value: ca_pem, - }, - Testfile { - name: "clientsidekey.pem", - value: client_side_key, - }, - Testfile { - name: "clientside.pem", - value: client_side_pem, - }, - Testfile { - name: "serversidekey.pem", - value: server_side_key, - }, - Testfile { - name: "serverside.pem", - value: server_side_pem, - }, - Testfile { - name: "credentials.txt", - value: credentials_txt, - }, - ]; - for test_file in test_files.iter() { - let file_path = certs_dir.join(test_file.name); - let mut file = fs::File::create(&file_path)?; - file.write_all(test_file.value)?; - } - - Ok(()) - } - - async fn get_basic_router_config_tls(port: u16) -> Config { - let mut config = config::default(); - config.set_mode(Some(WhatAmI::Router)).unwrap(); - config.listen.endpoints = vec![format!("tls/127.0.0.1:{}", port).parse().unwrap()]; - config.scouting.multicast.set_enabled(Some(false)).unwrap(); - config - .insert_json5( - "transport", - r#"{ - "link": { - "protocols": [ - "tls" - ], - "tls": { - "server_private_key": "tests/testfiles/serversidekey.pem", - "server_certificate": "tests/testfiles/serverside.pem", - "root_ca_certificate": "tests/testfiles/ca.pem", - "client_auth": true, - "server_name_verification": false - }, - }, - }"#, - ) - .unwrap(); - config - } - async fn get_basic_router_config_quic(port: u16) -> Config { - let mut config = config::default(); - config.set_mode(Some(WhatAmI::Router)).unwrap(); - config.listen.endpoints = vec![format!("quic/127.0.0.1:{}", port).parse().unwrap()]; - config.scouting.multicast.set_enabled(Some(false)).unwrap(); - config - .insert_json5( - "transport", - r#"{ - "link": { - "protocols": [ - "quic" - ], - "tls": { - "server_private_key": "tests/testfiles/serversidekey.pem", - "server_certificate": "tests/testfiles/serverside.pem", - "root_ca_certificate": "tests/testfiles/ca.pem", - "client_auth": true, - "server_name_verification": false - }, - }, - }"#, - ) - .unwrap(); - config - } - - async fn get_basic_router_config_usrpswd() -> Config { - let mut config = config::default(); - config.set_mode(Some(WhatAmI::Router)).unwrap(); - config.listen.endpoints = vec!["tcp/127.0.0.1:37447".parse().unwrap()]; - config.scouting.multicast.set_enabled(Some(false)).unwrap(); - config - .insert_json5( - "transport", - r#"{ - "auth": { - usrpwd: { - user: "routername", - password: "routerpasswd", - dictionary_file: "tests/testfiles/credentials.txt", - }, - }, - }"#, - ) - .unwrap(); - config - } - async fn close_router_session(s: Session) { - println!("Closing router session"); - ztimeout!(s.close()).unwrap(); - } - - async fn get_client_sessions_tls(port: u16) -> (Session, Session) { - println!("Opening client sessions"); - let mut config = config::client([format!("tls/127.0.0.1:{}", port) - .parse::() - .unwrap()]); - config - .insert_json5( - "transport", - r#"{ - "link": { - "protocols": [ - "tls" - ], - "tls": { - "root_ca_certificate": "tests/testfiles/ca.pem", - "client_private_key": "tests/testfiles/clientsidekey.pem", - "client_certificate": "tests/testfiles/clientside.pem", - "client_auth": true, - "server_name_verification": false - } - } - }"#, - ) - .unwrap(); - let s01 = ztimeout!(zenoh::open(config)).unwrap(); - let mut config = config::client([format!("tls/127.0.0.1:{}", port) - .parse::() - .unwrap()]); - config - .insert_json5( - "transport", - r#"{ - "link": { - "protocols": [ - "tls" - ], - "tls": { - "root_ca_certificate": "tests/testfiles/ca.pem", - "client_private_key": "tests/testfiles/clientsidekey.pem", - "client_certificate": "tests/testfiles/clientside.pem", - "client_auth": true, - "server_name_verification": false - } - } - }"#, - ) - .unwrap(); - let s02 = ztimeout!(zenoh::open(config)).unwrap(); - (s01, s02) - } - - async fn get_client_sessions_quic(port: u16) -> (Session, Session) { - println!("Opening client sessions"); - let mut config = config::client([format!("quic/127.0.0.1:{}", port) - .parse::() - .unwrap()]); - config - .insert_json5( - "transport", - r#"{ - "link": { - "protocols": [ - "quic" - ], - "tls": { - "root_ca_certificate": "tests/testfiles/ca.pem", - "client_private_key": "tests/testfiles/clientsidekey.pem", - "client_certificate": "tests/testfiles/clientside.pem", - "client_auth": true, - "server_name_verification": false - } - } - }"#, - ) - .unwrap(); - let s01 = ztimeout!(zenoh::open(config)).unwrap(); - let mut config = config::client([format!("quic/127.0.0.1:{}", port) - .parse::() - .unwrap()]); - config - .insert_json5( - "transport", - r#"{ - "link": { - "protocols": [ - "quic" - ], - "tls": { - "root_ca_certificate": "tests/testfiles/ca.pem", - "client_private_key": "tests/testfiles/clientsidekey.pem", - "client_certificate": "tests/testfiles/clientside.pem", - "client_auth": true, - "server_name_verification": false - } - } - }"#, - ) - .unwrap(); - let s02 = ztimeout!(zenoh::open(config)).unwrap(); - (s01, s02) - } - - async fn get_client_sessions_usrpswd() -> (Session, Session) { - println!("Opening client sessions"); - let mut config = config::client(["tcp/127.0.0.1:37447".parse::().unwrap()]); - config - .insert_json5( - "transport", - r#"{ - "auth": { - usrpwd: { - user: "client1name", - password: "client1passwd", - }, - } - }"#, - ) - .unwrap(); - let s01 = ztimeout!(zenoh::open(config)).unwrap(); - let mut config = config::client(["tcp/127.0.0.1:37447".parse::().unwrap()]); - config - .insert_json5( - "transport", - r#"{ - "auth": { - usrpwd: { - user: "client2name", - password: "client2passwd", - }, - } - }"#, - ) - .unwrap(); - let s02 = ztimeout!(zenoh::open(config)).unwrap(); - (s01, s02) - } - - async fn close_sessions(s01: Session, s02: Session) { - println!("Closing client sessions"); - ztimeout!(s01.close()).unwrap(); - ztimeout!(s02.close()).unwrap(); - } - - async fn test_pub_sub_deny_then_allow_tls(port: u16) { - println!("test_pub_sub_deny_then_allow_tls"); - - let mut config_router = get_basic_router_config_tls(port).await; - - config_router - .insert_json5( - "access_control", - r#"{ - "enabled": false, - "default_permission": "deny", - "rules": [ - { - "permission": "allow", - "flows": ["ingress","egress"], - "actions": [ - "put", - "declare_subscriber" - ], - "key_exprs": [ - "test/demo" - ], - "cert_common_names": [ - "client_side" - ] - }, - ] - }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - - let (sub_session, pub_session) = get_client_sessions_tls(port).await; - { - let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); - let received_value = Arc::new(Mutex::new(String::new())); - let temp_recv_value = received_value.clone(); - let subscriber = sub_session - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - let mut temp_value = zlock!(temp_recv_value); - *temp_value = sample.payload().deserialize::().unwrap(); - }) - .await - .unwrap(); - - tokio::time::sleep(SLEEP).await; - publisher.put(VALUE).await.unwrap(); - tokio::time::sleep(SLEEP).await; - assert_eq!(*zlock!(received_value), VALUE); - ztimeout!(subscriber.undeclare()).unwrap(); - } - close_sessions(sub_session, pub_session).await; - close_router_session(session).await; - } - - async fn test_pub_sub_allow_then_deny_tls(port: u16) { - println!("test_pub_sub_allow_then_deny_tls"); - let mut config_router = get_basic_router_config_tls(port).await; - config_router - .insert_json5( - "access_control", - r#"{ - "enabled": true, - "default_permission": "allow", - "rules": [ - { - "permission": "deny", - "flows": ["egress"], - "actions": [ - "put", - "declare_subscriber" - ], - "key_exprs": [ - "test/demo" - ], - "cert_common_names": [ - "client_side" - ] - }, - ] - }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (sub_session, pub_session) = get_client_sessions_tls(port).await; - { - let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); - let received_value = Arc::new(Mutex::new(String::new())); - let temp_recv_value = received_value.clone(); - let subscriber = - ztimeout!(sub_session - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - let mut temp_value = zlock!(temp_recv_value); - *temp_value = sample.payload().deserialize::().unwrap(); - })) - .unwrap(); - - tokio::time::sleep(SLEEP).await; - - ztimeout!(publisher.put(VALUE)).unwrap(); - tokio::time::sleep(SLEEP).await; - - assert_ne!(*zlock!(received_value), VALUE); - ztimeout!(subscriber.undeclare()).unwrap(); - } - close_sessions(sub_session, pub_session).await; - close_router_session(session).await; - } - - async fn test_get_qbl_deny_then_allow_tls(port: u16) { - println!("test_get_qbl_deny_then_allow_tls"); - - let mut config_router = get_basic_router_config_tls(port).await; - config_router - .insert_json5( - "access_control", - r#"{ - "enabled": true, - "default_permission": "deny", - "rules": [ - { - "permission": "allow", - "flows": ["egress","ingress"], - "actions": [ - "get", - "declare_queryable" - ], - "key_exprs": [ - "test/demo" - ], - "cert_common_names": [ - "client_side" - ] - }, - ] - }"#, - ) - .unwrap(); - - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - - let (get_session, qbl_session) = get_client_sessions_tls(port).await; - { - let mut received_value = String::new(); - - let qbl = ztimeout!(qbl_session - .declare_queryable(KEY_EXPR) - .callback(move |sample| { - tokio::task::block_in_place(move || { - Handle::current().block_on(async move { - ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() - }); - }); - })) - .unwrap(); - - tokio::time::sleep(SLEEP).await; - let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); - while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { - match reply.result() { - Ok(sample) => { - received_value = sample.payload().deserialize::().unwrap(); - break; - } - Err(e) => println!( - "Error : {}", - e.payload() - .deserialize::() - .unwrap_or_else(|e| format!("{}", e)) - ), - } - } - tokio::time::sleep(SLEEP).await; - assert_eq!(received_value, VALUE); - ztimeout!(qbl.undeclare()).unwrap(); - } - close_sessions(get_session, qbl_session).await; - close_router_session(session).await; - } - - async fn test_get_qbl_allow_then_deny_tls(port: u16) { - println!("test_get_qbl_allow_then_deny_tls"); - - let mut config_router = get_basic_router_config_tls(port).await; - config_router - .insert_json5( - "access_control", - r#"{ - "enabled": true, - "default_permission": "allow", - "rules": [ - { - "permission": "deny", - "flows": ["egress"], - "actions": [ - "get", - "declare_queryable" - ], - "key_exprs": [ - "test/demo" - ], - "cert_common_names": [ - "client_side" - ] - }, - ] - }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - - let (get_session, qbl_session) = get_client_sessions_tls(port).await; - { - let mut received_value = String::new(); - - let qbl = ztimeout!(qbl_session - .declare_queryable(KEY_EXPR) - .callback(move |sample| { - tokio::task::block_in_place(move || { - Handle::current().block_on(async move { - ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() - }); - }); - })) - .unwrap(); - - tokio::time::sleep(SLEEP).await; - let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); - while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { - match reply.result() { - Ok(sample) => { - received_value = sample.payload().deserialize::().unwrap(); - break; - } - Err(e) => println!( - "Error : {}", - e.payload() - .deserialize::() - .unwrap_or_else(|e| format!("{}", e)) - ), - } - } - tokio::time::sleep(SLEEP).await; - assert_ne!(received_value, VALUE); - ztimeout!(qbl.undeclare()).unwrap(); - } - close_sessions(get_session, qbl_session).await; - close_router_session(session).await; - } - - async fn test_pub_sub_deny_then_allow_quic(port: u16) { - println!("test_pub_sub_deny_then_allow_quic"); - - let mut config_router = get_basic_router_config_quic(port).await; - - config_router - .insert_json5( - "access_control", - r#"{ - "enabled": false, - "default_permission": "deny", - "rules": [ - { - "permission": "allow", - "flows": ["ingress","egress"], - "actions": [ - "put", - "declare_subscriber" - ], - "key_exprs": [ - "test/demo" - ], - "cert_common_names": [ - "client_side" - ] - }, - ] - }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - - let (sub_session, pub_session) = get_client_sessions_quic(port).await; - { - let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); - let received_value = Arc::new(Mutex::new(String::new())); - let temp_recv_value = received_value.clone(); - let subscriber = sub_session - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - let mut temp_value = zlock!(temp_recv_value); - *temp_value = sample.payload().deserialize::().unwrap(); - }) - .await - .unwrap(); - - tokio::time::sleep(SLEEP).await; - publisher.put(VALUE).await.unwrap(); - tokio::time::sleep(SLEEP).await; - assert_eq!(*zlock!(received_value), VALUE); - ztimeout!(subscriber.undeclare()).unwrap(); - } - close_sessions(sub_session, pub_session).await; - close_router_session(session).await; - } - - #[allow(unused)] - async fn test_pub_sub_allow_then_deny_quic(port: u16) { - println!("test_pub_sub_allow_then_deny_quic"); - - let mut config_router = get_basic_router_config_quic(port).await; - config_router - .insert_json5( - "access_control", - r#"{ - "enabled": true, - "default_permission": "allow", - "rules": [ - { - "permission": "deny", - "flows": ["egress"], - "actions": [ - "put", - "declare_subscriber" - ], - "key_exprs": [ - "test/demo" - ], - "cert_common_names": [ - "client_side" - ] - }, - ] - }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (sub_session, pub_session) = get_client_sessions_quic(port).await; - { - let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); - let received_value = Arc::new(Mutex::new(String::new())); - let temp_recv_value = received_value.clone(); - let subscriber = - ztimeout!(sub_session - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - let mut temp_value = zlock!(temp_recv_value); - *temp_value = sample.payload().deserialize::().unwrap(); - })) - .unwrap(); - - tokio::time::sleep(SLEEP).await; - - ztimeout!(publisher.put(VALUE)).unwrap(); - tokio::time::sleep(SLEEP).await; - - assert_ne!(*zlock!(received_value), VALUE); - ztimeout!(subscriber.undeclare()).unwrap(); - } - close_sessions(sub_session, pub_session).await; - close_router_session(session).await; - } - - #[allow(unused)] - async fn test_get_qbl_deny_then_allow_quic(port: u16) { - println!("test_get_qbl_deny_then_allow_quic"); - - let mut config_router = get_basic_router_config_quic(port).await; - config_router - .insert_json5( - "access_control", - r#"{ - "enabled": true, - "default_permission": "deny", - "rules": [ - { - "permission": "allow", - "flows": ["egress","ingress"], - "actions": [ - "get", - "declare_queryable"], - "key_exprs": [ - "test/demo" - ], - "cert_common_names": [ - "client_side" - ] - }, - ] - }"#, - ) - .unwrap(); - - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - - let (get_session, qbl_session) = get_client_sessions_quic(port).await; - { - let mut received_value = String::new(); - - let qbl = ztimeout!(qbl_session - .declare_queryable(KEY_EXPR) - .callback(move |sample| { - tokio::task::block_in_place(move || { - Handle::current().block_on(async move { - ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() - }); - }); - })) - .unwrap(); - - tokio::time::sleep(SLEEP).await; - let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); - while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { - match reply.result() { - Ok(sample) => { - received_value = sample.payload().deserialize::().unwrap(); - break; - } - Err(e) => println!( - "Error : {}", - e.payload() - .deserialize::() - .unwrap_or_else(|e| format!("{}", e)) - ), - } - } - tokio::time::sleep(SLEEP).await; - assert_eq!(received_value, VALUE); - ztimeout!(qbl.undeclare()).unwrap(); - } - close_sessions(get_session, qbl_session).await; - close_router_session(session).await; - } - - #[allow(unused)] - async fn test_get_qbl_allow_then_deny_quic(port: u16) { - println!("test_get_qbl_allow_then_deny_quic"); - - let mut config_router = get_basic_router_config_quic(port).await; - config_router - .insert_json5( - "access_control", - r#"{ - "enabled": true, - "default_permission": "allow", - "rules": - [ - { - "permission": "deny", - "flows": ["egress"], - "actions": [ - "get", - "declare_queryable" - ], - "key_exprs": [ - "test/demo" - ], - "cert_common_names": [ - "client_side" - ] - }, - ] - }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - - let (get_session, qbl_session) = get_client_sessions_quic(port).await; - { - let mut received_value = String::new(); - - let qbl = ztimeout!(qbl_session - .declare_queryable(KEY_EXPR) - .callback(move |sample| { - tokio::task::block_in_place(move || { - Handle::current().block_on(async move { - ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() - }); - }); - })) - .unwrap(); - - tokio::time::sleep(SLEEP).await; - let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); - while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { - match reply.result() { - Ok(sample) => { - received_value = sample.payload().deserialize::().unwrap(); - break; - } - Err(e) => println!( - "Error : {}", - e.payload() - .deserialize::() - .unwrap_or_else(|e| format!("{}", e)) - ), - } - } - tokio::time::sleep(SLEEP).await; - assert_ne!(received_value, VALUE); - ztimeout!(qbl.undeclare()).unwrap(); - } - close_sessions(get_session, qbl_session).await; - close_router_session(session).await; - } - - async fn test_pub_sub_deny_then_allow_usrpswd() { - println!("test_pub_sub_deny_then_allow_usrpswd"); - - let mut config_router = get_basic_router_config_usrpswd().await; - - config_router - .insert_json5( - "access_control", - r#"{ - "enabled": false, - "default_permission": "deny", - "rules": [ - { - "permission": "allow", - "flows": ["ingress","egress"], - "actions": [ - "put", - "declare_subscriber" - ], - "key_exprs": [ - "test/demo" - ], - "usernames": [ - "client1name", - "client2name" - ] - }, - ] - }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - - let (sub_session, pub_session) = get_client_sessions_usrpswd().await; - { - let publisher = pub_session.declare_publisher(KEY_EXPR).await.unwrap(); - let received_value = Arc::new(Mutex::new(String::new())); - let temp_recv_value = received_value.clone(); - let subscriber = sub_session - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - let mut temp_value = zlock!(temp_recv_value); - *temp_value = sample.payload().deserialize::().unwrap(); - }) - .await - .unwrap(); - - tokio::time::sleep(SLEEP).await; - publisher.put(VALUE).await.unwrap(); - tokio::time::sleep(SLEEP).await; - assert_eq!(*zlock!(received_value), VALUE); - ztimeout!(subscriber.undeclare()).unwrap(); - } - close_sessions(sub_session, pub_session).await; - close_router_session(session).await; - } - - async fn test_pub_sub_allow_then_deny_usrpswd() { - println!("test_pub_sub_allow_then_deny_usrpswd"); - - let mut config_router = get_basic_router_config_usrpswd().await; - config_router - .insert_json5( - "access_control", - r#"{ - "enabled": true, - "default_permission": "allow", - "rules": [ - { - "permission": "deny", - "flows": ["egress"], - "actions": [ - "put", - "declare_subscriber" - ], - "key_exprs": [ - "test/demo" - ], - "usernames": [ - "client1name", - "client2name" - ] - }, - ] - }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - let (sub_session, pub_session) = get_client_sessions_usrpswd().await; - { - let publisher = ztimeout!(pub_session.declare_publisher(KEY_EXPR)).unwrap(); - let received_value = Arc::new(Mutex::new(String::new())); - let temp_recv_value = received_value.clone(); - let subscriber = - ztimeout!(sub_session - .declare_subscriber(KEY_EXPR) - .callback(move |sample| { - let mut temp_value = zlock!(temp_recv_value); - *temp_value = sample.payload().deserialize::().unwrap(); - })) - .unwrap(); - - tokio::time::sleep(SLEEP).await; - - ztimeout!(publisher.put(VALUE)).unwrap(); - tokio::time::sleep(SLEEP).await; - - assert_ne!(*zlock!(received_value), VALUE); - ztimeout!(subscriber.undeclare()).unwrap(); - } - close_sessions(sub_session, pub_session).await; - close_router_session(session).await; - } - - async fn test_get_qbl_deny_then_allow_usrpswd() { - println!("test_get_qbl_deny_then_allow_usrpswd"); - - let mut config_router = get_basic_router_config_usrpswd().await; - config_router - .insert_json5( - "access_control", - r#"{ - "enabled": true, - "default_permission": "deny", - "rules": [ - { - "permission": "allow", - "flows": ["egress","ingress"], - "actions": [ - "get", - "declare_queryable" - ], - "key_exprs": [ - "test/demo" - ], - "usernames": [ - "client1name", - "client2name" - ] - }, - ] - }"#, - ) - .unwrap(); - - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - - let (get_session, qbl_session) = get_client_sessions_usrpswd().await; - { - let mut received_value = String::new(); - - let qbl = ztimeout!(qbl_session - .declare_queryable(KEY_EXPR) - .callback(move |sample| { - tokio::task::block_in_place(move || { - Handle::current().block_on(async move { - ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() - }); - }); - })) - .unwrap(); - - tokio::time::sleep(SLEEP).await; - let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); - while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { - match reply.result() { - Ok(sample) => { - received_value = sample.payload().deserialize::().unwrap(); - break; - } - Err(e) => println!( - "Error : {}", - e.payload() - .deserialize::() - .unwrap_or_else(|e| format!("{}", e)) - ), - } - } - tokio::time::sleep(SLEEP).await; - assert_eq!(received_value, VALUE); - ztimeout!(qbl.undeclare()).unwrap(); - } - close_sessions(get_session, qbl_session).await; - close_router_session(session).await; - } - - async fn test_get_qbl_allow_then_deny_usrpswd() { - println!("test_get_qbl_allow_then_deny_usrpswd"); - - let mut config_router = get_basic_router_config_usrpswd().await; - config_router - .insert_json5( - "access_control", - r#"{ - "enabled": true, - "default_permission": "allow", - "rules": [ - { - "permission": "deny", - "flows": ["egress"], - "actions": [ - "get", - "declare_queryable" - ], - "key_exprs": [ - "test/demo" - ], - "usernames": [ - "client1name", - "client2name" - ] - }, - ] - }"#, - ) - .unwrap(); - println!("Opening router session"); - - let session = ztimeout!(zenoh::open(config_router)).unwrap(); - - let (get_session, qbl_session) = get_client_sessions_usrpswd().await; - { - let mut received_value = String::new(); - - let qbl = ztimeout!(qbl_session - .declare_queryable(KEY_EXPR) - .callback(move |sample| { - tokio::task::block_in_place(move || { - Handle::current().block_on(async move { - ztimeout!(sample.reply(KEY_EXPR, VALUE)).unwrap() - }); - }); - })) - .unwrap(); - - tokio::time::sleep(SLEEP).await; - let recv_reply = ztimeout!(get_session.get(KEY_EXPR)).unwrap(); - while let Ok(reply) = ztimeout!(recv_reply.recv_async()) { - match reply.result() { - Ok(sample) => { - received_value = sample.payload().deserialize::().unwrap(); - break; - } - Err(e) => println!( - "Error : {}", - e.payload() - .deserialize::() - .unwrap_or_else(|e| format!("{}", e)) - ), - } - } - tokio::time::sleep(SLEEP).await; - assert_ne!(received_value, VALUE); - ztimeout!(qbl.undeclare()).unwrap(); - } - close_sessions(get_session, qbl_session).await; - close_router_session(session).await; - } -}