diff --git a/commons/zenoh-buffers/src/zbuf.rs b/commons/zenoh-buffers/src/zbuf.rs index db62e26f54..1365397966 100644 --- a/commons/zenoh-buffers/src/zbuf.rs +++ b/commons/zenoh-buffers/src/zbuf.rs @@ -20,7 +20,7 @@ use crate::{ ZSlice, }; use alloc::{sync::Arc, vec::Vec}; -use core::{cmp, iter, mem, num::NonZeroUsize, ptr, slice}; +use core::{cmp, iter, mem, num::NonZeroUsize, ops::RangeBounds, ptr}; use zenoh_collections::SingleOrVec; fn get_mut_unchecked(arc: &mut Arc) -> &mut T { @@ -55,6 +55,85 @@ impl ZBuf { self.slices.push(zslice); } } + + pub fn splice>(&mut self, erased: Range, replacement: &[u8]) { + let start = match erased.start_bound() { + core::ops::Bound::Included(n) => *n, + core::ops::Bound::Excluded(n) => n + 1, + core::ops::Bound::Unbounded => 0, + }; + let end = match erased.end_bound() { + core::ops::Bound::Included(n) => n + 1, + core::ops::Bound::Excluded(n) => *n, + core::ops::Bound::Unbounded => self.len(), + }; + if start != end { + self.remove(start, end); + } + self.insert(start, replacement); + } + fn remove(&mut self, mut start: usize, mut end: usize) { + assert!(start <= end); + assert!(end <= self.len()); + let mut start_slice_idx = 0; + let mut start_idx_in_start_slice = 0; + let mut end_slice_idx = 0; + let mut end_idx_in_end_slice = 0; + for (i, slice) in self.slices.as_mut().iter_mut().enumerate() { + if slice.len() > start { + start_slice_idx = i; + start_idx_in_start_slice = start; + } + if slice.len() >= end { + end_slice_idx = i; + end_idx_in_end_slice = end; + break; + } + start -= slice.len(); + end -= slice.len(); + } + let start_slice = &mut self.slices.as_mut()[start_slice_idx]; + start_slice.end = start_slice.start + start_idx_in_start_slice; + let drain_start = start_slice_idx + (start_slice.start < start_slice.end) as usize; + let end_slice = &mut self.slices.as_mut()[end_slice_idx]; + end_slice.start += end_idx_in_end_slice; + let drain_end = end_slice_idx + (end_slice.start >= end_slice.end) as usize; + self.slices.drain(drain_start..drain_end); + } + fn insert(&mut self, mut at: usize, slice: &[u8]) { + if slice.is_empty() { + return; + } + let old_at = at; + let mut slice_index = usize::MAX; + for (i, slice) in self.slices.as_ref().iter().enumerate() { + if at < slice.len() { + slice_index = i; + break; + } + if let Some(new_at) = at.checked_sub(slice.len()) { + at = new_at + } else { + panic!( + "Out of bounds insert attempted: at={old_at}, len={}", + self.len() + ) + } + } + if at != 0 { + let split = &self.slices.as_ref()[slice_index]; + let (l, r) = ( + split.subslice(0, at).unwrap(), + split.subslice(at, split.len()).unwrap(), + ); + self.slices.drain(slice_index..(slice_index + 1)); + self.slices.insert(slice_index, l); + self.slices.insert(slice_index + 1, Vec::from(slice).into()); + self.slices.insert(slice_index + 2, r); + } else { + self.slices.insert(slice_index, Vec::from(slice).into()) + } + } } // Buffer @@ -70,7 +149,7 @@ impl Buffer for ZBuf { // SplitBuffer impl SplitBuffer for ZBuf { - type Slices<'a> = iter::Map, fn(&'a ZSlice) -> &'a [u8]>; + type Slices<'a> = iter::Map, fn(&'a ZSlice) -> &'a [u8]>; fn slices(&self) -> Self::Slices<'_> { self.slices.as_ref().iter().map(ZSlice::as_slice) @@ -89,7 +168,7 @@ impl PartialEq for ZBuf { (None, _) | (_, None) => return false, (Some(l), Some(r)) => { let cmp_len = l.len().min(r.len()); - // SAFETY: cmp_len is the minimum lenght between l and r slices. + // SAFETY: cmp_len is the minimum length between l and r slices. let lhs = crate::unsafe_slice!(l, ..cmp_len); let rhs = crate::unsafe_slice!(r, ..cmp_len); if lhs != rhs { @@ -98,14 +177,14 @@ impl PartialEq for ZBuf { if cmp_len == l.len() { current_self = self_slices.next(); } else { - // SAFETY: cmp_len is the minimum lenght between l and r slices. + // SAFETY: cmp_len is the minimum length between l and r slices. let lhs = crate::unsafe_slice!(l, cmp_len..); current_self = Some(lhs); } if cmp_len == r.len() { current_other = other_slices.next(); } else { - // SAFETY: cmp_len is the minimum lenght between l and r slices. + // SAFETY: cmp_len is the minimum length between l and r slices. let rhs = crate::unsafe_slice!(r, cmp_len..); current_other = Some(rhs); } @@ -161,12 +240,12 @@ impl<'a> Reader for ZBufReader<'a> { // Take the minimum length among read and write slices let len = from.len().min(into.len()); // Copy the slice content - // SAFETY: len is the minimum lenght between from and into slices. + // SAFETY: len is the minimum length between from and into slices. let lhs = crate::unsafe_slice_mut!(into, ..len); let rhs = crate::unsafe_slice!(from, ..len); lhs.copy_from_slice(rhs); // Advance the write slice - // SAFETY: len is the minimum lenght between from and into slices. + // SAFETY: len is the minimum length between from and into slices. into = crate::unsafe_slice_mut!(into, len..); // Update the counter read += len; @@ -380,9 +459,20 @@ impl<'a> HasWriter for &'a mut ZBuf { type Writer = ZBufWriter<'a>; fn writer(self) -> Self::Writer { + let mut cache = None; + if let Some(ZSlice { buf, end, .. }) = self.slices.last_mut() { + // Verify the ZSlice is actually a Vec + if let Some(b) = buf.as_any().downcast_ref::>() { + // Check for the length + if *end == b.len() { + cache = Some(unsafe { Arc::from_raw(Arc::into_raw(buf.clone()).cast()) }) + } + } + } + ZBufWriter { inner: self, - cache: Arc::new(Vec::new()), + cache: cache.unwrap_or_else(|| Arc::new(Vec::new())), } } } @@ -433,7 +523,7 @@ impl Writer for ZBufWriter<'_> { } fn write_u8(&mut self, byte: u8) -> Result<(), DidntWrite> { - self.write_exact(slice::from_ref(&byte)) + self.write_exact(core::slice::from_ref(&byte)) } fn remaining(&self) -> usize { diff --git a/commons/zenoh-codec/benches/codec.rs b/commons/zenoh-codec/benches/codec.rs index 2c786a41db..1c46a700a7 100644 --- a/commons/zenoh-codec/benches/codec.rs +++ b/commons/zenoh-codec/benches/codec.rs @@ -91,6 +91,7 @@ fn criterion_benchmark(c: &mut Criterion) { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], payload: ZBuf::from(vec![0u8; 8]), }), @@ -136,6 +137,7 @@ fn criterion_benchmark(c: &mut Criterion) { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], payload: ZBuf::from(vec![0u8; 8]), }), @@ -176,6 +178,7 @@ fn criterion_benchmark(c: &mut Criterion) { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], payload: ZBuf::from(vec![0u8; 8]), }), @@ -216,6 +219,7 @@ fn criterion_benchmark(c: &mut Criterion) { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], payload: ZBuf::from(vec![0u8; 1_000_000]), }), @@ -243,6 +247,7 @@ fn criterion_benchmark(c: &mut Criterion) { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], payload: ZBuf::from(vec![0u8; 1_000_000]), }), @@ -281,6 +286,7 @@ fn criterion_benchmark(c: &mut Criterion) { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], payload: ZBuf::from(vec![0u8; 1_000_000]), }), diff --git a/commons/zenoh-codec/src/common/extension.rs b/commons/zenoh-codec/src/common/extension.rs index 4215711815..b31cfc19bc 100644 --- a/commons/zenoh-codec/src/common/extension.rs +++ b/commons/zenoh-codec/src/common/extension.rs @@ -88,7 +88,9 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: (&ZExtUnit<{ ID }>, bool)) -> Self::Output { - let (_x, more) = x; + let (x, more) = x; + let ZExtUnit = x; + let mut header: u8 = ID; if more { header |= iext::FLAG_Z; @@ -134,12 +136,14 @@ where fn write(self, writer: &mut W, x: (&ZExtZ64<{ ID }>, bool)) -> Self::Output { let (x, more) = x; + let ZExtZ64 { value } = x; + let mut header: u8 = ID; if more { header |= iext::FLAG_Z; } self.write(&mut *writer, header)?; - self.write(&mut *writer, x.value)?; + self.write(&mut *writer, value)?; Ok(()) } } @@ -182,13 +186,15 @@ where fn write(self, writer: &mut W, x: (&ZExtZBuf<{ ID }>, bool)) -> Self::Output { let (x, more) = x; + let ZExtZBuf { value } = x; + let mut header: u8 = ID; if more { header |= iext::FLAG_Z; } self.write(&mut *writer, header)?; let bodec = Zenoh080Bounded::::new(); - bodec.write(&mut *writer, &x.value)?; + bodec.write(&mut *writer, value)?; Ok(()) } } @@ -231,13 +237,15 @@ where fn write(self, writer: &mut W, x: (&ZExtZBufHeader<{ ID }>, bool)) -> Self::Output { let (x, more) = x; + let ZExtZBufHeader { len } = x; + let mut header: u8 = ID; if more { header |= iext::FLAG_Z; } self.write(&mut *writer, header)?; let bodec = Zenoh080Bounded::::new(); - bodec.write(&mut *writer, x.len)?; + bodec.write(&mut *writer, *len)?; Ok(()) } } @@ -284,11 +292,13 @@ where fn write(self, writer: &mut W, x: (&ZExtUnknown, bool)) -> Self::Output { let (x, more) = x; - let mut header: u8 = x.id; + let ZExtUnknown { id, body } = x; + + let mut header: u8 = *id; if more { header |= iext::FLAG_Z; } - match &x.body { + match body { ZExtBody::Unit => self.write(&mut *writer, header)?, ZExtBody::Z64(u64) => { self.write(&mut *writer, header)?; diff --git a/commons/zenoh-codec/src/core/property.rs b/commons/zenoh-codec/src/core/property.rs index 02536ccd82..bb7f760208 100644 --- a/commons/zenoh-codec/src/core/property.rs +++ b/commons/zenoh-codec/src/core/property.rs @@ -26,8 +26,10 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Property) -> Self::Output { - self.write(&mut *writer, x.key)?; - self.write(&mut *writer, x.value.as_slice())?; + let Property { key, value } = x; + + self.write(&mut *writer, key)?; + self.write(&mut *writer, value.as_slice())?; Ok(()) } } diff --git a/commons/zenoh-codec/src/core/shm.rs b/commons/zenoh-codec/src/core/shm.rs index 1ab6976ebe..69c5c59ce0 100644 --- a/commons/zenoh-codec/src/core/shm.rs +++ b/commons/zenoh-codec/src/core/shm.rs @@ -25,10 +25,17 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &SharedMemoryBufInfo) -> Self::Output { - self.write(&mut *writer, x.offset)?; - self.write(&mut *writer, x.length)?; - self.write(&mut *writer, x.shm_manager.as_str())?; - self.write(&mut *writer, x.kind)?; + let SharedMemoryBufInfo { + offset, + length, + shm_manager, + kind, + } = x; + + self.write(&mut *writer, offset)?; + self.write(&mut *writer, length)?; + self.write(&mut *writer, shm_manager.as_str())?; + self.write(&mut *writer, kind)?; Ok(()) } } diff --git a/commons/zenoh-codec/src/core/wire_expr.rs b/commons/zenoh-codec/src/core/wire_expr.rs index bc484149ce..6caba6c8c7 100644 --- a/commons/zenoh-codec/src/core/wire_expr.rs +++ b/commons/zenoh-codec/src/core/wire_expr.rs @@ -29,12 +29,18 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &WireExpr<'_>) -> Self::Output { + let WireExpr { + scope, + suffix, + mapping: _, + } = x; + let zodec = Zenoh080Bounded::::new(); - zodec.write(&mut *writer, x.scope)?; + zodec.write(&mut *writer, *scope)?; if x.has_suffix() { let zodec = Zenoh080Bounded::::new(); - zodec.write(&mut *writer, x.suffix.as_ref())?; + zodec.write(&mut *writer, suffix.as_ref())?; } Ok(()) } diff --git a/commons/zenoh-codec/src/network/declare.rs b/commons/zenoh-codec/src/network/declare.rs index ae3a3dd77c..20916dc359 100644 --- a/commons/zenoh-codec/src/network/declare.rs +++ b/commons/zenoh-codec/src/network/declare.rs @@ -93,32 +93,39 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Declare) -> Self::Output { + let Declare { + ext_qos, + ext_tstamp, + ext_nodeid, + body, + } = x; + // Header let mut header = id::DECLARE; - let mut n_exts = ((x.ext_qos != declare::ext::QoSType::default()) as u8) - + (x.ext_tstamp.is_some() as u8) - + ((x.ext_nodeid != declare::ext::NodeIdType::default()) as u8); + let mut n_exts = ((ext_qos != &declare::ext::QoSType::default()) as u8) + + (ext_tstamp.is_some() as u8) + + ((ext_nodeid != &declare::ext::NodeIdType::default()) as u8); if n_exts != 0 { header |= declare::flag::Z; } self.write(&mut *writer, header)?; // Extensions - if x.ext_qos != declare::ext::QoSType::default() { + if ext_qos != &declare::ext::QoSType::default() { n_exts -= 1; - self.write(&mut *writer, (x.ext_qos, n_exts != 0))?; + self.write(&mut *writer, (*ext_qos, n_exts != 0))?; } - if let Some(ts) = x.ext_tstamp.as_ref() { + if let Some(ts) = ext_tstamp.as_ref() { n_exts -= 1; self.write(&mut *writer, (ts, n_exts != 0))?; } - if x.ext_nodeid != declare::ext::NodeIdType::default() { + if ext_nodeid != &declare::ext::NodeIdType::default() { n_exts -= 1; - self.write(&mut *writer, (x.ext_nodeid, n_exts != 0))?; + self.write(&mut *writer, (*ext_nodeid, n_exts != 0))?; } // Body - self.write(&mut *writer, &x.body)?; + self.write(&mut *writer, body)?; Ok(()) } @@ -200,16 +207,18 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &keyexpr::DeclareKeyExpr) -> Self::Output { + let keyexpr::DeclareKeyExpr { id, wire_expr } = x; + // Header let mut header = declare::id::D_KEYEXPR; - if x.wire_expr.has_suffix() { + if wire_expr.has_suffix() { header |= keyexpr::flag::N; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.id)?; - self.write(&mut *writer, &x.wire_expr)?; + self.write(&mut *writer, id)?; + self.write(&mut *writer, wire_expr)?; Ok(()) } @@ -262,12 +271,14 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &keyexpr::UndeclareKeyExpr) -> Self::Output { + let keyexpr::UndeclareKeyExpr { id } = x; + // Header let header = declare::id::U_KEYEXPR; self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.id)?; + self.write(&mut *writer, id)?; Ok(()) } @@ -321,28 +332,34 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &subscriber::DeclareSubscriber) -> Self::Output { + let subscriber::DeclareSubscriber { + id, + wire_expr, + ext_info, + } = x; + // Header let mut header = declare::id::D_SUBSCRIBER; - let mut n_exts = (x.ext_info != subscriber::ext::SubscriberInfo::default()) as u8; + let mut n_exts = (ext_info != &subscriber::ext::SubscriberInfo::default()) as u8; if n_exts != 0 { header |= subscriber::flag::Z; } - if x.wire_expr.mapping != Mapping::default() { + if wire_expr.mapping != Mapping::default() { header |= subscriber::flag::M; } - if x.wire_expr.has_suffix() { + if wire_expr.has_suffix() { header |= subscriber::flag::N; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.id)?; - self.write(&mut *writer, &x.wire_expr)?; + self.write(&mut *writer, id)?; + self.write(&mut *writer, wire_expr)?; // Extensions - if x.ext_info != subscriber::ext::SubscriberInfo::default() { + if ext_info != &subscriber::ext::SubscriberInfo::default() { n_exts -= 1; - self.write(&mut *writer, (x.ext_info, n_exts != 0))?; + self.write(&mut *writer, (*ext_info, n_exts != 0))?; } Ok(()) @@ -420,15 +437,17 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &subscriber::UndeclareSubscriber) -> Self::Output { + let subscriber::UndeclareSubscriber { id, ext_wire_expr } = x; + // Header let header = declare::id::U_SUBSCRIBER | subscriber::flag::Z; self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.id)?; + self.write(&mut *writer, id)?; // Extension - self.write(&mut *writer, (&x.ext_wire_expr, false))?; + self.write(&mut *writer, (ext_wire_expr, false))?; Ok(()) } @@ -497,26 +516,32 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &queryable::DeclareQueryable) -> Self::Output { + let queryable::DeclareQueryable { + id, + wire_expr, + ext_info, + } = x; + // Header let mut header = declare::id::D_QUERYABLE; - let mut n_exts = (x.ext_info != queryable::ext::QueryableInfo::default()) as u8; + let mut n_exts = (ext_info != &queryable::ext::QueryableInfo::default()) as u8; if n_exts != 0 { header |= subscriber::flag::Z; } - if x.wire_expr.mapping != Mapping::default() { + if wire_expr.mapping != Mapping::default() { header |= subscriber::flag::M; } - if x.wire_expr.has_suffix() { + if wire_expr.has_suffix() { header |= subscriber::flag::N; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.id)?; - self.write(&mut *writer, &x.wire_expr)?; - if x.ext_info != queryable::ext::QueryableInfo::default() { + self.write(&mut *writer, id)?; + self.write(&mut *writer, wire_expr)?; + if ext_info != &queryable::ext::QueryableInfo::default() { n_exts -= 1; - self.write(&mut *writer, (x.ext_info, n_exts != 0))?; + self.write(&mut *writer, (*ext_info, n_exts != 0))?; } Ok(()) @@ -594,15 +619,17 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &queryable::UndeclareQueryable) -> Self::Output { + let queryable::UndeclareQueryable { id, ext_wire_expr } = x; + // Header let header = declare::id::U_QUERYABLE | queryable::flag::Z; self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.id)?; + self.write(&mut *writer, id)?; // Extension - self.write(&mut *writer, (&x.ext_wire_expr, false))?; + self.write(&mut *writer, (ext_wire_expr, false))?; Ok(()) } @@ -668,19 +695,21 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &token::DeclareToken) -> Self::Output { + let token::DeclareToken { id, wire_expr } = x; + // Header let mut header = declare::id::D_TOKEN; - if x.wire_expr.mapping != Mapping::default() { + if wire_expr.mapping != Mapping::default() { header |= subscriber::flag::M; } - if x.wire_expr.has_suffix() { + if wire_expr.has_suffix() { header |= subscriber::flag::N; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.id)?; - self.write(&mut *writer, &x.wire_expr)?; + self.write(&mut *writer, id)?; + self.write(&mut *writer, wire_expr)?; Ok(()) } @@ -738,15 +767,17 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &token::UndeclareToken) -> Self::Output { + let token::UndeclareToken { id, ext_wire_expr } = x; + // Header let header = declare::id::U_TOKEN | token::flag::Z; self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.id)?; + self.write(&mut *writer, id)?; // Extension - self.write(&mut *writer, (&x.ext_wire_expr, false))?; + self.write(&mut *writer, (ext_wire_expr, false))?; Ok(()) } @@ -812,20 +843,26 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &interest::DeclareInterest) -> Self::Output { + let interest::DeclareInterest { + id, + wire_expr, + interest, + } = x; + // Header let mut header = declare::id::D_INTEREST; - if x.wire_expr.mapping != Mapping::default() { + if wire_expr.mapping != Mapping::default() { header |= subscriber::flag::M; } - if x.wire_expr.has_suffix() { + if wire_expr.has_suffix() { header |= subscriber::flag::N; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.id)?; - self.write(&mut *writer, &x.wire_expr)?; - self.write(&mut *writer, x.interest.as_u8())?; + self.write(&mut *writer, id)?; + self.write(&mut *writer, wire_expr)?; + self.write(&mut *writer, interest.as_u8())?; Ok(()) } @@ -888,12 +925,14 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &interest::FinalInterest) -> Self::Output { + let interest::FinalInterest { id } = x; + // Header let header = declare::id::F_INTEREST; self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.id)?; + self.write(&mut *writer, id)?; Ok(()) } @@ -945,15 +984,17 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &interest::UndeclareInterest) -> Self::Output { + let interest::UndeclareInterest { id, ext_wire_expr } = x; + // Header let header = declare::id::U_INTEREST | interest::flag::Z; self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.id)?; + self.write(&mut *writer, id)?; // Extension - self.write(&mut *writer, (&x.ext_wire_expr, false))?; + self.write(&mut *writer, (ext_wire_expr, false))?; Ok(()) } @@ -1020,6 +1061,7 @@ where fn write(self, writer: &mut W, x: (&common::ext::WireExprType, bool)) -> Self::Output { let (x, more) = x; + let common::ext::WireExprType { wire_expr } = x; let codec = Zenoh080::new(); let mut value = ZBuf::empty(); @@ -1029,14 +1071,14 @@ where if x.wire_expr.has_suffix() { flags |= 1; } - if let Mapping::Receiver = x.wire_expr.mapping { + if let Mapping::Receiver = wire_expr.mapping { flags |= 1 << 1; } codec.write(&mut zriter, flags)?; - codec.write(&mut zriter, x.wire_expr.scope)?; - if x.wire_expr.has_suffix() { - zriter.write_exact(x.wire_expr.suffix.as_bytes())?; + codec.write(&mut zriter, wire_expr.scope)?; + if wire_expr.has_suffix() { + zriter.write_exact(wire_expr.suffix.as_bytes())?; } let ext = common::ext::WireExprExt { value }; diff --git a/commons/zenoh-codec/src/network/mod.rs b/commons/zenoh-codec/src/network/mod.rs index 7263c3fe27..c1f2489b88 100644 --- a/commons/zenoh-codec/src/network/mod.rs +++ b/commons/zenoh-codec/src/network/mod.rs @@ -27,7 +27,7 @@ use zenoh_buffers::{ use zenoh_protocol::{ common::{imsg, ZExtZ64, ZExtZBufHeader}, core::{Reliability, ZenohId}, - network::*, + network::{ext::EntityIdType, *}, }; // NetworkMessage @@ -38,7 +38,9 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &NetworkMessage) -> Self::Output { - match &x.body { + let NetworkMessage { body, .. } = x; + + match body { NetworkBody::Push(b) => self.write(&mut *writer, b), NetworkBody::Request(b) => self.write(&mut *writer, b), NetworkBody::Response(b) => self.write(&mut *writer, b), @@ -218,7 +220,9 @@ where // Extension: EntityId impl LCodec<&ext::EntityIdType<{ ID }>> for Zenoh080 { fn w_len(self, x: &ext::EntityIdType<{ ID }>) -> usize { - 1 + self.w_len(&x.zid) + self.w_len(x.eid) + let EntityIdType { zid, eid } = x; + + 1 + self.w_len(zid) + self.w_len(*eid) } } diff --git a/commons/zenoh-codec/src/network/oam.rs b/commons/zenoh-codec/src/network/oam.rs index 0e59421ba8..ff6daeb020 100644 --- a/commons/zenoh-codec/src/network/oam.rs +++ b/commons/zenoh-codec/src/network/oam.rs @@ -32,9 +32,16 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Oam) -> Self::Output { + let Oam { + id, + body, + ext_qos, + ext_tstamp, + } = x; + // Header let mut header = id::OAM; - match &x.body { + match &body { ZExtBody::Unit => { header |= iext::ENC_UNIT; } @@ -46,27 +53,27 @@ where } } let mut n_exts = - ((x.ext_qos != ext::QoSType::default()) as u8) + (x.ext_tstamp.is_some() as u8); + ((ext_qos != &ext::QoSType::default()) as u8) + (ext_tstamp.is_some() as u8); if n_exts != 0 { header |= flag::Z; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.id)?; + self.write(&mut *writer, id)?; // Extensions - if x.ext_qos != ext::QoSType::default() { + if ext_qos != &ext::QoSType::default() { n_exts -= 1; - self.write(&mut *writer, (x.ext_qos, n_exts != 0))?; + self.write(&mut *writer, (*ext_qos, n_exts != 0))?; } - if let Some(ts) = x.ext_tstamp.as_ref() { + if let Some(ts) = ext_tstamp.as_ref() { n_exts -= 1; self.write(&mut *writer, (ts, n_exts != 0))?; } // Payload - match &x.body { + match body { ZExtBody::Unit => {} ZExtBody::Z64(u64) => { self.write(&mut *writer, u64)?; diff --git a/commons/zenoh-codec/src/network/push.rs b/commons/zenoh-codec/src/network/push.rs index f6d4ee7f0c..10a8489b29 100644 --- a/commons/zenoh-codec/src/network/push.rs +++ b/commons/zenoh-codec/src/network/push.rs @@ -34,41 +34,49 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Push) -> Self::Output { + let Push { + wire_expr, + ext_qos, + ext_tstamp, + ext_nodeid, + payload, + } = x; + // Header let mut header = id::PUSH; - let mut n_exts = ((x.ext_qos != ext::QoSType::default()) as u8) - + (x.ext_tstamp.is_some() as u8) - + ((x.ext_nodeid != ext::NodeIdType::default()) as u8); + let mut n_exts = ((ext_qos != &ext::QoSType::default()) as u8) + + (ext_tstamp.is_some() as u8) + + ((ext_nodeid != &ext::NodeIdType::default()) as u8); if n_exts != 0 { header |= flag::Z; } - if x.wire_expr.mapping != Mapping::default() { + if wire_expr.mapping != Mapping::default() { header |= flag::M; } - if x.wire_expr.has_suffix() { + if wire_expr.has_suffix() { header |= flag::N; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, &x.wire_expr)?; + self.write(&mut *writer, wire_expr)?; // Extensions - if x.ext_qos != ext::QoSType::default() { + if ext_qos != &ext::QoSType::default() { n_exts -= 1; - self.write(&mut *writer, (x.ext_qos, n_exts != 0))?; + self.write(&mut *writer, (*ext_qos, n_exts != 0))?; } - if let Some(ts) = x.ext_tstamp.as_ref() { + if let Some(ts) = ext_tstamp.as_ref() { n_exts -= 1; self.write(&mut *writer, (ts, n_exts != 0))?; } - if x.ext_nodeid != ext::NodeIdType::default() { + if ext_nodeid != &ext::NodeIdType::default() { n_exts -= 1; - self.write(&mut *writer, (x.ext_nodeid, n_exts != 0))?; + self.write(&mut *writer, (*ext_nodeid, n_exts != 0))?; } // Payload - self.write(&mut *writer, &x.payload)?; + self.write(&mut *writer, payload)?; Ok(()) } diff --git a/commons/zenoh-codec/src/network/request.rs b/commons/zenoh-codec/src/network/request.rs index 088c9e79f8..19711ff147 100644 --- a/commons/zenoh-codec/src/network/request.rs +++ b/commons/zenoh-codec/src/network/request.rs @@ -37,8 +37,9 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: (&ext::TargetType, bool)) -> Self::Output { - let (rt, more) = x; - let v = match rt { + let (x, more) = x; + + let v = match x { ext::TargetType::BestMatching => 0, ext::TargetType::All => 1, ext::TargetType::AllComplete => 2, @@ -78,59 +79,71 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Request) -> Self::Output { + let Request { + id, + wire_expr, + ext_qos, + ext_tstamp, + ext_nodeid, + ext_target, + ext_budget, + ext_timeout, + payload, + } = x; + // Header let mut header = id::REQUEST; - let mut n_exts = ((x.ext_qos != ext::QoSType::default()) as u8) - + (x.ext_tstamp.is_some() as u8) - + ((x.ext_target != ext::TargetType::default()) as u8) - + (x.ext_budget.is_some() as u8) - + (x.ext_timeout.is_some() as u8) - + ((x.ext_nodeid != ext::NodeIdType::default()) as u8); + let mut n_exts = ((ext_qos != &ext::QoSType::default()) as u8) + + (ext_tstamp.is_some() as u8) + + ((ext_target != &ext::TargetType::default()) as u8) + + (ext_budget.is_some() as u8) + + (ext_timeout.is_some() as u8) + + ((ext_nodeid != &ext::NodeIdType::default()) as u8); if n_exts != 0 { header |= flag::Z; } - if x.wire_expr.mapping != Mapping::default() { + if wire_expr.mapping != Mapping::default() { header |= flag::M; } - if x.wire_expr.has_suffix() { + if wire_expr.has_suffix() { header |= flag::N; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.id)?; - self.write(&mut *writer, &x.wire_expr)?; + self.write(&mut *writer, id)?; + self.write(&mut *writer, wire_expr)?; // Extensions - if x.ext_qos != ext::QoSType::default() { + if ext_qos != &ext::QoSType::default() { n_exts -= 1; - self.write(&mut *writer, (x.ext_qos, n_exts != 0))?; + self.write(&mut *writer, (*ext_qos, n_exts != 0))?; } - if let Some(ts) = x.ext_tstamp.as_ref() { + if let Some(ts) = ext_tstamp.as_ref() { n_exts -= 1; self.write(&mut *writer, (ts, n_exts != 0))?; } - if x.ext_target != ext::TargetType::default() { + if ext_target != &ext::TargetType::default() { n_exts -= 1; - self.write(&mut *writer, (&x.ext_target, n_exts != 0))?; + self.write(&mut *writer, (ext_target, n_exts != 0))?; } - if let Some(l) = x.ext_budget.as_ref() { + if let Some(l) = ext_budget.as_ref() { n_exts -= 1; let e = ext::Budget::new(l.get() as u64); self.write(&mut *writer, (&e, n_exts != 0))?; } - if let Some(to) = x.ext_timeout.as_ref() { + if let Some(to) = ext_timeout.as_ref() { n_exts -= 1; let e = ext::Timeout::new(to.as_millis() as u64); self.write(&mut *writer, (&e, n_exts != 0))?; } - if x.ext_nodeid != ext::NodeIdType::default() { + if ext_nodeid != &ext::NodeIdType::default() { n_exts -= 1; - self.write(&mut *writer, (x.ext_nodeid, n_exts != 0))?; + self.write(&mut *writer, (*ext_nodeid, n_exts != 0))?; } // Payload - self.write(&mut *writer, &x.payload)?; + self.write(&mut *writer, payload)?; Ok(()) } diff --git a/commons/zenoh-codec/src/network/response.rs b/commons/zenoh-codec/src/network/response.rs index 59d97fefda..bec7df2967 100644 --- a/commons/zenoh-codec/src/network/response.rs +++ b/commons/zenoh-codec/src/network/response.rs @@ -37,42 +37,51 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Response) -> Self::Output { + let Response { + rid, + wire_expr, + payload, + ext_qos, + ext_tstamp, + ext_respid, + } = x; + // Header let mut header = id::RESPONSE; - let mut n_exts = ((x.ext_qos != ext::QoSType::default()) as u8) - + (x.ext_tstamp.is_some() as u8) - + (x.ext_respid.is_some() as u8); + let mut n_exts = ((ext_qos != &ext::QoSType::default()) as u8) + + (ext_tstamp.is_some() as u8) + + (ext_respid.is_some() as u8); if n_exts != 0 { header |= flag::Z; } - if x.wire_expr.mapping != Mapping::default() { + if wire_expr.mapping != Mapping::default() { header |= flag::M; } - if x.wire_expr.has_suffix() { + if wire_expr.has_suffix() { header |= flag::N; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.rid)?; - self.write(&mut *writer, &x.wire_expr)?; + self.write(&mut *writer, rid)?; + self.write(&mut *writer, wire_expr)?; // Extensions - if x.ext_qos != ext::QoSType::default() { + if ext_qos != &ext::QoSType::default() { n_exts -= 1; - self.write(&mut *writer, (x.ext_qos, n_exts != 0))?; + self.write(&mut *writer, (*ext_qos, n_exts != 0))?; } - if let Some(ts) = x.ext_tstamp.as_ref() { + if let Some(ts) = ext_tstamp.as_ref() { n_exts -= 1; self.write(&mut *writer, (ts, n_exts != 0))?; } - if let Some(ri) = x.ext_respid.as_ref() { + if let Some(ri) = ext_respid.as_ref() { n_exts -= 1; self.write(&mut *writer, (ri, n_exts != 0))?; } // Payload - self.write(&mut *writer, &x.payload)?; + self.write(&mut *writer, payload)?; Ok(()) } @@ -166,24 +175,30 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &ResponseFinal) -> Self::Output { + let ResponseFinal { + rid, + ext_qos, + ext_tstamp, + } = x; + // Header let mut header = id::RESPONSE_FINAL; let mut n_exts = - ((x.ext_qos != ext::QoSType::default()) as u8) + (x.ext_tstamp.is_some() as u8); + ((ext_qos != &ext::QoSType::default()) as u8) + (ext_tstamp.is_some() as u8); if n_exts != 0 { header |= flag::Z; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.rid)?; + self.write(&mut *writer, rid)?; // Extensions - if x.ext_qos != ext::QoSType::default() { + if ext_qos != &ext::QoSType::default() { n_exts -= 1; - self.write(&mut *writer, (x.ext_qos, n_exts != 0))?; + self.write(&mut *writer, (*ext_qos, n_exts != 0))?; } - if let Some(ts) = x.ext_tstamp.as_ref() { + if let Some(ts) = ext_tstamp.as_ref() { n_exts -= 1; self.write(&mut *writer, (ts, n_exts != 0))?; } diff --git a/commons/zenoh-codec/src/scouting/hello.rs b/commons/zenoh-codec/src/scouting/hello.rs index 1793676cde..430201133e 100644 --- a/commons/zenoh-codec/src/scouting/hello.rs +++ b/commons/zenoh-codec/src/scouting/hello.rs @@ -33,31 +33,38 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Hello) -> Self::Output { + let Hello { + version, + whatami, + zid, + locators, + } = x; + // Header let mut header = id::HELLO; - if !x.locators.is_empty() { + if !locators.is_empty() { header |= flag::L; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.version)?; + self.write(&mut *writer, version)?; let mut flags: u8 = 0; - let whatami: u8 = match x.whatami { + let whatami: u8 = match whatami { WhatAmI::Router => 0b00, WhatAmI::Peer => 0b01, WhatAmI::Client => 0b10, }; flags |= whatami & 0b11; - flags |= ((x.zid.size() - 1) as u8) << 4; + flags |= ((zid.size() - 1) as u8) << 4; self.write(&mut *writer, flags)?; - let lodec = Zenoh080Length::new(x.zid.size()); - lodec.write(&mut *writer, &x.zid)?; + let lodec = Zenoh080Length::new(zid.size()); + lodec.write(&mut *writer, zid)?; - if !x.locators.is_empty() { - self.write(&mut *writer, x.locators.as_slice())?; + if !locators.is_empty() { + self.write(&mut *writer, locators.as_slice())?; } Ok(()) diff --git a/commons/zenoh-codec/src/scouting/mod.rs b/commons/zenoh-codec/src/scouting/mod.rs index 70f6fb8065..bbedce4282 100644 --- a/commons/zenoh-codec/src/scouting/mod.rs +++ b/commons/zenoh-codec/src/scouting/mod.rs @@ -31,7 +31,9 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &ScoutingMessage) -> Self::Output { - match &x.body { + let ScoutingMessage { body, .. } = x; + + match body { ScoutingBody::Scout(s) => self.write(&mut *writer, s), ScoutingBody::Hello(h) => self.write(&mut *writer, h), } diff --git a/commons/zenoh-codec/src/scouting/scout.rs b/commons/zenoh-codec/src/scouting/scout.rs index 941c455866..02d5294047 100644 --- a/commons/zenoh-codec/src/scouting/scout.rs +++ b/commons/zenoh-codec/src/scouting/scout.rs @@ -33,22 +33,24 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Scout) -> Self::Output { + let Scout { version, what, zid } = x; + // Header let header = id::SCOUT; self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.version)?; + self.write(&mut *writer, version)?; let mut flags: u8 = 0; - let what: u8 = x.what.into(); + let what: u8 = (*what).into(); flags |= what & 0b111; - if let Some(zid) = x.zid.as_ref() { + if let Some(zid) = zid.as_ref() { flags |= (((zid.size() - 1) as u8) << 4) | flag::I; }; self.write(&mut *writer, flags)?; - if let Some(zid) = x.zid.as_ref() { + if let Some(zid) = zid.as_ref() { let lodec = Zenoh080Length::new(zid.size()); lodec.write(&mut *writer, zid)?; } diff --git a/commons/zenoh-codec/src/transport/close.rs b/commons/zenoh-codec/src/transport/close.rs index 86b54f8688..9771b9e1e9 100644 --- a/commons/zenoh-codec/src/transport/close.rs +++ b/commons/zenoh-codec/src/transport/close.rs @@ -31,15 +31,17 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Close) -> Self::Output { + let Close { reason, session } = x; + // Header let mut header = id::CLOSE; - if x.session { + if *session { header |= flag::S; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.reason)?; + self.write(&mut *writer, reason)?; Ok(()) } diff --git a/commons/zenoh-codec/src/transport/fragment.rs b/commons/zenoh-codec/src/transport/fragment.rs index 7cc827d378..b66f395df1 100644 --- a/commons/zenoh-codec/src/transport/fragment.rs +++ b/commons/zenoh-codec/src/transport/fragment.rs @@ -33,25 +33,32 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &FragmentHeader) -> Self::Output { + let FragmentHeader { + reliability, + more, + sn, + ext_qos, + } = x; + // Header let mut header = id::FRAGMENT; - if let Reliability::Reliable = x.reliability { + if let Reliability::Reliable = reliability { header |= flag::R; } - if x.more { + if *more { header |= flag::M; } - if x.ext_qos != ext::QoSType::default() { + if ext_qos != &ext::QoSType::default() { header |= flag::Z; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.sn)?; + self.write(&mut *writer, sn)?; // Extensions - if x.ext_qos != ext::QoSType::default() { - self.write(&mut *writer, (x.ext_qos, false))?; + if ext_qos != &ext::QoSType::default() { + self.write(&mut *writer, (*ext_qos, false))?; } Ok(()) @@ -125,17 +132,25 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Fragment) -> Self::Output { + let Fragment { + reliability, + more, + sn, + payload, + ext_qos, + } = x; + // Header let header = FragmentHeader { - reliability: x.reliability, - more: x.more, - sn: x.sn, - ext_qos: x.ext_qos, + reliability: *reliability, + more: *more, + sn: *sn, + ext_qos: *ext_qos, }; self.write(&mut *writer, &header)?; // Body - writer.write_zslice(&x.payload)?; + writer.write_zslice(payload)?; Ok(()) } diff --git a/commons/zenoh-codec/src/transport/frame.rs b/commons/zenoh-codec/src/transport/frame.rs index 1293dc950c..8d39aabcdb 100644 --- a/commons/zenoh-codec/src/transport/frame.rs +++ b/commons/zenoh-codec/src/transport/frame.rs @@ -35,21 +35,27 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &FrameHeader) -> Self::Output { + let FrameHeader { + reliability, + sn, + ext_qos, + } = x; + // Header let mut header = id::FRAME; - if let Reliability::Reliable = x.reliability { + if let Reliability::Reliable = reliability { header |= flag::R; } - if x.ext_qos != ext::QoSType::default() { + if ext_qos != &ext::QoSType::default() { header |= flag::Z; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.sn)?; + self.write(&mut *writer, sn)?; // Extensions - if x.ext_qos != ext::QoSType::default() { + if ext_qos != &ext::QoSType::default() { self.write(&mut *writer, (x.ext_qos, false))?; } @@ -122,16 +128,23 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Frame) -> Self::Output { + let Frame { + reliability, + sn, + payload, + ext_qos, + } = x; + // Header let header = FrameHeader { - reliability: x.reliability, - sn: x.sn, - ext_qos: x.ext_qos, + reliability: *reliability, + sn: *sn, + ext_qos: *ext_qos, }; self.write(&mut *writer, &header)?; // Body - for m in x.payload.iter() { + for m in payload.iter() { self.write(&mut *writer, m)?; } diff --git a/commons/zenoh-codec/src/transport/init.rs b/commons/zenoh-codec/src/transport/init.rs index 5f98c77e5b..d3a92165ea 100644 --- a/commons/zenoh-codec/src/transport/init.rs +++ b/commons/zenoh-codec/src/transport/init.rs @@ -53,7 +53,7 @@ where // Header let mut header = id::INIT; - if *resolution != Resolution::default() || *batch_size != batch_size::UNICAST { + if resolution != &Resolution::default() || batch_size != &batch_size::UNICAST { header |= flag::S; } let mut n_exts = (ext_qos.is_some() as u8) @@ -253,7 +253,7 @@ where // Header let mut header = id::INIT | flag::A; - if *resolution != Resolution::default() || *batch_size != batch_size::UNICAST { + if resolution != &Resolution::default() || batch_size != &batch_size::UNICAST { header |= flag::S; } let mut n_exts = (ext_qos.is_some() as u8) diff --git a/commons/zenoh-codec/src/transport/join.rs b/commons/zenoh-codec/src/transport/join.rs index 197190946a..80c1663413 100644 --- a/commons/zenoh-codec/src/transport/join.rs +++ b/commons/zenoh-codec/src/transport/join.rs @@ -30,7 +30,11 @@ use zenoh_protocol::{ impl LCodec<&PrioritySn> for Zenoh080 { fn w_len(self, p: &PrioritySn) -> usize { - self.w_len(p.reliable) + self.w_len(p.best_effort) + let PrioritySn { + reliable, + best_effort, + } = p; + self.w_len(*reliable) + self.w_len(*best_effort) } } @@ -41,8 +45,13 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &PrioritySn) -> Self::Output { - self.write(&mut *writer, x.reliable)?; - self.write(&mut *writer, x.best_effort)?; + let PrioritySn { + reliable, + best_effort, + } = x; + + self.write(&mut *writer, reliable)?; + self.write(&mut *writer, best_effort)?; Ok(()) } } @@ -129,52 +138,64 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Join) -> Self::Output { + let Join { + version, + whatami, + zid, + resolution, + batch_size, + lease, + next_sn, + ext_qos, + ext_shm, + } = x; + // Header let mut header = id::JOIN; - if x.lease.as_millis() % 1_000 == 0 { + if lease.as_millis() % 1_000 == 0 { header |= flag::T; } - if x.resolution != Resolution::default() || x.batch_size != batch_size::MULTICAST { + if resolution != &Resolution::default() || batch_size != &batch_size::MULTICAST { header |= flag::S; } - let mut n_exts = (x.ext_qos.is_some() as u8) + (x.ext_shm.is_some() as u8); + let mut n_exts = (ext_qos.is_some() as u8) + (ext_shm.is_some() as u8); if n_exts != 0 { header |= flag::Z; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.version)?; + self.write(&mut *writer, version)?; - let whatami: u8 = match x.whatami { + let whatami: u8 = match whatami { WhatAmI::Router => 0b00, WhatAmI::Peer => 0b01, WhatAmI::Client => 0b10, }; - let flags: u8 = ((x.zid.size() as u8 - 1) << 4) | whatami; + let flags: u8 = ((zid.size() as u8 - 1) << 4) | whatami; self.write(&mut *writer, flags)?; - let lodec = Zenoh080Length::new(x.zid.size()); - lodec.write(&mut *writer, &x.zid)?; + let lodec = Zenoh080Length::new(zid.size()); + lodec.write(&mut *writer, zid)?; if imsg::has_flag(header, flag::S) { - self.write(&mut *writer, x.resolution.as_u8())?; - self.write(&mut *writer, x.batch_size.to_le_bytes())?; + self.write(&mut *writer, resolution.as_u8())?; + self.write(&mut *writer, batch_size.to_le_bytes())?; } if imsg::has_flag(header, flag::T) { - self.write(&mut *writer, x.lease.as_secs())?; + self.write(&mut *writer, lease.as_secs())?; } else { - self.write(&mut *writer, x.lease.as_millis() as u64)?; + self.write(&mut *writer, lease.as_millis() as u64)?; } - self.write(&mut *writer, &x.next_sn)?; + self.write(&mut *writer, next_sn)?; // Extensions - if let Some(qos) = x.ext_qos.as_ref() { + if let Some(qos) = ext_qos.as_ref() { n_exts -= 1; self.write(&mut *writer, (qos, n_exts != 0))?; } - if let Some(shm) = x.ext_shm.as_ref() { + if let Some(shm) = ext_shm.as_ref() { n_exts -= 1; self.write(&mut *writer, (shm, n_exts != 0))?; } diff --git a/commons/zenoh-codec/src/transport/keepalive.rs b/commons/zenoh-codec/src/transport/keepalive.rs index ce432e63a6..aa6726f50b 100644 --- a/commons/zenoh-codec/src/transport/keepalive.rs +++ b/commons/zenoh-codec/src/transport/keepalive.rs @@ -30,7 +30,9 @@ where { type Output = Result<(), DidntWrite>; - fn write(self, writer: &mut W, _x: &KeepAlive) -> Self::Output { + fn write(self, writer: &mut W, x: &KeepAlive) -> Self::Output { + let KeepAlive = x; + // Header let header = id::KEEP_ALIVE; self.write(&mut *writer, header)?; diff --git a/commons/zenoh-codec/src/transport/mod.rs b/commons/zenoh-codec/src/transport/mod.rs index 4ddf872551..559b5b5fda 100644 --- a/commons/zenoh-codec/src/transport/mod.rs +++ b/commons/zenoh-codec/src/transport/mod.rs @@ -40,7 +40,9 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &TransportMessageLowLatency) -> Self::Output { - match &x.body { + let TransportMessageLowLatency { body } = x; + + match body { TransportBodyLowLatency::Network(b) => self.write(&mut *writer, b), TransportBodyLowLatency::KeepAlive(b) => self.write(&mut *writer, b), TransportBodyLowLatency::Close(b) => self.write(&mut *writer, b), @@ -79,7 +81,9 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &TransportMessage) -> Self::Output { - match &x.body { + let TransportMessage { body, .. } = x; + + match body { TransportBody::Frame(b) => self.write(&mut *writer, b), TransportBody::Fragment(b) => self.write(&mut *writer, b), TransportBody::KeepAlive(b) => self.write(&mut *writer, b), @@ -142,6 +146,7 @@ where fn write(self, writer: &mut W, x: (ext::QoSType<{ ID }>, bool)) -> Self::Output { let (x, more) = x; let ext: ZExtZ64<{ ID }> = x.into(); + self.write(&mut *writer, (&ext, more)) } } diff --git a/commons/zenoh-codec/src/transport/oam.rs b/commons/zenoh-codec/src/transport/oam.rs index 46fe63345e..e2f905abf8 100644 --- a/commons/zenoh-codec/src/transport/oam.rs +++ b/commons/zenoh-codec/src/transport/oam.rs @@ -32,9 +32,11 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Oam) -> Self::Output { + let Oam { id, body, ext_qos } = x; + // Header let mut header = id::OAM; - match &x.body { + match &body { ZExtBody::Unit => { header |= iext::ENC_UNIT; } @@ -45,23 +47,23 @@ where header |= iext::ENC_ZBUF; } } - let mut n_exts = (x.ext_qos != ext::QoSType::default()) as u8; + let mut n_exts = (ext_qos != &ext::QoSType::default()) as u8; if n_exts != 0 { header |= flag::Z; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.id)?; + self.write(&mut *writer, id)?; // Extensions - if x.ext_qos != ext::QoSType::default() { + if ext_qos != &ext::QoSType::default() { n_exts -= 1; - self.write(&mut *writer, (x.ext_qos, n_exts != 0))?; + self.write(&mut *writer, (*ext_qos, n_exts != 0))?; } // Payload - match &x.body { + match &body { ZExtBody::Unit => {} ZExtBody::Z64(u64) => { self.write(&mut *writer, u64)?; diff --git a/commons/zenoh-codec/src/transport/open.rs b/commons/zenoh-codec/src/transport/open.rs index 17482b1610..f895942ea1 100644 --- a/commons/zenoh-codec/src/transport/open.rs +++ b/commons/zenoh-codec/src/transport/open.rs @@ -36,8 +36,8 @@ where fn write(self, writer: &mut W, x: &OpenSyn) -> Self::Output { let OpenSyn { - initial_sn, lease, + initial_sn, cookie, ext_qos, ext_shm, @@ -208,8 +208,8 @@ where fn write(self, writer: &mut W, x: &OpenAck) -> Self::Output { let OpenAck { - initial_sn, lease, + initial_sn, ext_qos, ext_shm, ext_auth, diff --git a/commons/zenoh-codec/src/zenoh/ack.rs b/commons/zenoh-codec/src/zenoh/ack.rs index 0b940eb877..78cbca2987 100644 --- a/commons/zenoh-codec/src/zenoh/ack.rs +++ b/commons/zenoh-codec/src/zenoh/ack.rs @@ -32,28 +32,34 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Ack) -> Self::Output { + let Ack { + timestamp, + ext_sinfo, + ext_unknown, + } = x; + // Header let mut header = id::ACK; - if x.timestamp.is_some() { + if timestamp.is_some() { header |= flag::T; } - let mut n_exts = ((x.ext_sinfo.is_some()) as u8) + (x.ext_unknown.len() as u8); + let mut n_exts = ((ext_sinfo.is_some()) as u8) + (ext_unknown.len() as u8); if n_exts != 0 { header |= flag::Z; } self.write(&mut *writer, header)?; // Body - if let Some(ts) = x.timestamp.as_ref() { + if let Some(ts) = timestamp.as_ref() { self.write(&mut *writer, ts)?; } // Extensions - if let Some(sinfo) = x.ext_sinfo.as_ref() { + if let Some(sinfo) = ext_sinfo.as_ref() { n_exts -= 1; self.write(&mut *writer, (sinfo, n_exts != 0))?; } - for u in x.ext_unknown.iter() { + for u in ext_unknown.iter() { n_exts -= 1; self.write(&mut *writer, (u, n_exts != 0))?; } diff --git a/commons/zenoh-codec/src/zenoh/del.rs b/commons/zenoh-codec/src/zenoh/del.rs index cdd5c332d8..3d0a64f428 100644 --- a/commons/zenoh-codec/src/zenoh/del.rs +++ b/commons/zenoh-codec/src/zenoh/del.rs @@ -32,28 +32,41 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Del) -> Self::Output { + let Del { + timestamp, + ext_sinfo, + ext_attachment, + ext_unknown, + } = x; + // Header let mut header = id::DEL; - if x.timestamp.is_some() { + if timestamp.is_some() { header |= flag::T; } - let mut n_exts = (x.ext_sinfo.is_some()) as u8 + (x.ext_unknown.len() as u8); + let mut n_exts = (ext_sinfo.is_some()) as u8 + + (ext_attachment.is_some()) as u8 + + (ext_unknown.len() as u8); if n_exts != 0 { header |= flag::Z; } self.write(&mut *writer, header)?; // Body - if let Some(ts) = x.timestamp.as_ref() { + if let Some(ts) = timestamp.as_ref() { self.write(&mut *writer, ts)?; } // Extensions - if let Some(sinfo) = x.ext_sinfo.as_ref() { + if let Some(sinfo) = ext_sinfo.as_ref() { n_exts -= 1; self.write(&mut *writer, (sinfo, n_exts != 0))?; } - for u in x.ext_unknown.iter() { + if let Some(att) = ext_attachment.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (att, n_exts != 0))?; + } + for u in ext_unknown.iter() { n_exts -= 1; self.write(&mut *writer, (u, n_exts != 0))?; } @@ -94,6 +107,7 @@ where // Extensions let mut ext_sinfo: Option = None; + let mut ext_attachment: Option = None; let mut ext_unknown = Vec::new(); let mut has_ext = imsg::has_flag(self.header, flag::Z); @@ -106,6 +120,11 @@ where ext_sinfo = Some(s); has_ext = ext; } + ext::Attachment::ID => { + let (a, ext): (ext::AttachmentType, bool) = eodec.read(&mut *reader)?; + ext_attachment = Some(a); + has_ext = ext; + } _ => { let (u, ext) = extension::read(reader, "Del", ext)?; ext_unknown.push(u); @@ -117,6 +136,7 @@ where Ok(Del { timestamp, ext_sinfo, + ext_attachment, ext_unknown, }) } diff --git a/commons/zenoh-codec/src/zenoh/err.rs b/commons/zenoh-codec/src/zenoh/err.rs index 425044402c..5cef1a6389 100644 --- a/commons/zenoh-codec/src/zenoh/err.rs +++ b/commons/zenoh-codec/src/zenoh/err.rs @@ -32,38 +32,46 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Err) -> Self::Output { + let Err { + code, + is_infrastructure, + timestamp, + ext_sinfo, + ext_body, + ext_unknown, + } = x; + // Header let mut header = id::ERR; - if x.timestamp.is_some() { + if timestamp.is_some() { header |= flag::T; } - if x.is_infrastructure { + if *is_infrastructure { header |= flag::I; } - let mut n_exts = (x.ext_sinfo.is_some() as u8) - + (x.ext_body.is_some() as u8) - + (x.ext_unknown.len() as u8); + let mut n_exts = + (ext_sinfo.is_some() as u8) + (ext_body.is_some() as u8) + (ext_unknown.len() as u8); if n_exts != 0 { header |= flag::Z; } self.write(&mut *writer, header)?; // Body - self.write(&mut *writer, x.code)?; - if let Some(ts) = x.timestamp.as_ref() { + self.write(&mut *writer, code)?; + if let Some(ts) = timestamp.as_ref() { self.write(&mut *writer, ts)?; } // Extensions - if let Some(sinfo) = x.ext_sinfo.as_ref() { + if let Some(sinfo) = ext_sinfo.as_ref() { n_exts -= 1; self.write(&mut *writer, (sinfo, n_exts != 0))?; } - if let Some(body) = x.ext_body.as_ref() { + if let Some(body) = ext_body.as_ref() { n_exts -= 1; self.write(&mut *writer, (body, n_exts != 0))?; } - for u in x.ext_unknown.iter() { + for u in ext_unknown.iter() { n_exts -= 1; self.write(&mut *writer, (u, n_exts != 0))?; } diff --git a/commons/zenoh-codec/src/zenoh/mod.rs b/commons/zenoh-codec/src/zenoh/mod.rs index dea0b7c495..2e3ea48be7 100644 --- a/commons/zenoh-codec/src/zenoh/mod.rs +++ b/commons/zenoh-codec/src/zenoh/mod.rs @@ -153,7 +153,9 @@ where // Extension: SourceInfo impl LCodec<&ext::SourceInfoType<{ ID }>> for Zenoh080 { fn w_len(self, x: &ext::SourceInfoType<{ ID }>) -> usize { - 1 + self.w_len(&x.zid) + self.w_len(x.eid) + self.w_len(x.sn) + let ext::SourceInfoType { zid, eid, sn } = x; + + 1 + self.w_len(zid) + self.w_len(*eid) + self.w_len(*sn) } } @@ -165,17 +167,19 @@ where fn write(self, writer: &mut W, x: (&ext::SourceInfoType<{ ID }>, bool)) -> Self::Output { let (x, more) = x; + let ext::SourceInfoType { zid, eid, sn } = x; + let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x)); self.write(&mut *writer, (&header, more))?; - let flags: u8 = (x.zid.size() as u8 - 1) << 4; + let flags: u8 = (zid.size() as u8 - 1) << 4; self.write(&mut *writer, flags)?; - let lodec = Zenoh080Length::new(x.zid.size()); - lodec.write(&mut *writer, &x.zid)?; + let lodec = Zenoh080Length::new(zid.size()); + lodec.write(&mut *writer, zid)?; - self.write(&mut *writer, x.eid)?; - self.write(&mut *writer, x.sn)?; + self.write(&mut *writer, eid)?; + self.write(&mut *writer, sn)?; Ok(()) } } @@ -211,7 +215,9 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: (&ext::ShmType<{ ID }>, bool)) -> Self::Output { - let (_, more) = x; + let (x, more) = x; + let ext::ShmType = x; + let header: ZExtUnit<{ ID }> = ZExtUnit::new(); self.write(&mut *writer, (&header, more))?; Ok(()) @@ -241,25 +247,31 @@ where fn write(self, writer: &mut W, x: (&ext::ValueType<{ VID }, { SID }>, bool)) -> Self::Output { let (x, more) = x; + let ext::ValueType { + encoding, + payload, + #[cfg(feature = "shared-memory")] + ext_shm, + } = x; #[cfg(feature = "shared-memory")] // Write Shm extension if present - if let Some(eshm) = x.ext_shm.as_ref() { + if let Some(eshm) = ext_shm.as_ref() { self.write(&mut *writer, (eshm, true))?; } // Compute extension length - let mut len = self.w_len(&x.encoding); + let mut len = self.w_len(encoding); #[cfg(feature = "shared-memory")] { - let codec = Zenoh080Sliced::::new(x.ext_shm.is_some()); - len += codec.w_len(&x.payload); + let codec = Zenoh080Sliced::::new(ext_shm.is_some()); + len += codec.w_len(payload); } #[cfg(not(feature = "shared-memory"))] { let codec = Zenoh080Bounded::::new(); - len += codec.w_len(&x.payload); + len += codec.w_len(payload); } // Write ZExtBuf header @@ -267,7 +279,7 @@ where self.write(&mut *writer, (&header, more))?; // Write encoding - self.write(&mut *writer, &x.encoding)?; + self.write(&mut *writer, encoding)?; // Write payload fn write(writer: &mut W, payload: &ZBuf) -> Result<(), DidntWrite> @@ -283,17 +295,17 @@ where #[cfg(feature = "shared-memory")] { - if x.ext_shm.is_some() { + if ext_shm.is_some() { let codec = Zenoh080Sliced::::new(true); - codec.write(&mut *writer, &x.payload)?; + codec.write(&mut *writer, payload)?; } else { - write(&mut *writer, &x.payload)?; + write(&mut *writer, payload)?; } } #[cfg(not(feature = "shared-memory"))] { - write(&mut *writer, &x.payload)?; + write(&mut *writer, payload)?; } Ok(()) @@ -367,3 +379,39 @@ where )) } } + +// Extension: Attachment +impl WCodec<(&ext::AttachmentType<{ ID }>, bool), &mut W> for Zenoh080 +where + W: Writer, +{ + type Output = Result<(), DidntWrite>; + + fn write(self, writer: &mut W, x: (&ext::AttachmentType<{ ID }>, bool)) -> Self::Output { + let (x, more) = x; + let ext::AttachmentType { buffer } = x; + + let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(buffer)); + self.write(&mut *writer, (&header, more))?; + for s in buffer.zslices() { + writer.write_zslice(s)?; + } + + Ok(()) + } +} + +impl RCodec<(ext::AttachmentType<{ ID }>, bool), &mut R> for Zenoh080Header +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result<(ext::AttachmentType<{ ID }>, bool), Self::Error> { + let (h, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?; + let mut buffer = ZBuf::empty(); + reader.read_zslices(h.len, |s| buffer.push_zslice(s))?; + + Ok((ext::AttachmentType { buffer }, more)) + } +} diff --git a/commons/zenoh-codec/src/zenoh/pull.rs b/commons/zenoh-codec/src/zenoh/pull.rs index 2b2a3a61e0..dc71901d58 100644 --- a/commons/zenoh-codec/src/zenoh/pull.rs +++ b/commons/zenoh-codec/src/zenoh/pull.rs @@ -33,16 +33,18 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Pull) -> Self::Output { + let Pull { ext_unknown } = x; + // Header let mut header = id::PULL; - let mut n_exts = x.ext_unknown.len() as u8; + let mut n_exts = ext_unknown.len() as u8; if n_exts != 0 { header |= flag::Z; } self.write(&mut *writer, header)?; // Extensions - for u in x.ext_unknown.iter() { + for u in ext_unknown.iter() { n_exts -= 1; self.write(&mut *writer, (u, n_exts != 0))?; } diff --git a/commons/zenoh-codec/src/zenoh/put.rs b/commons/zenoh-codec/src/zenoh/put.rs index 6358a533a1..ebc364cf9b 100644 --- a/commons/zenoh-codec/src/zenoh/put.rs +++ b/commons/zenoh-codec/src/zenoh/put.rs @@ -38,18 +38,31 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Put) -> Self::Output { + let Put { + timestamp, + encoding, + ext_sinfo, + ext_attachment, + #[cfg(feature = "shared-memory")] + ext_shm, + ext_unknown, + payload, + }: &Put = x; + // Header let mut header = id::PUT; - if x.timestamp.is_some() { + if timestamp.is_some() { header |= flag::T; } - if x.encoding != Encoding::default() { + if encoding != &Encoding::default() { header |= flag::E; } - let mut n_exts = (x.ext_sinfo.is_some()) as u8 + (x.ext_unknown.len() as u8); + let mut n_exts = (ext_sinfo.is_some()) as u8 + + (ext_attachment.is_some()) as u8 + + (ext_unknown.len() as u8); #[cfg(feature = "shared-memory")] { - n_exts += x.ext_shm.is_some() as u8; + n_exts += ext_shm.is_some() as u8; } if n_exts != 0 { header |= flag::Z; @@ -57,24 +70,28 @@ where self.write(&mut *writer, header)?; // Body - if let Some(ts) = x.timestamp.as_ref() { + if let Some(ts) = timestamp.as_ref() { self.write(&mut *writer, ts)?; } - if x.encoding != Encoding::default() { - self.write(&mut *writer, &x.encoding)?; + if encoding != &Encoding::default() { + self.write(&mut *writer, encoding)?; } // Extensions - if let Some(sinfo) = x.ext_sinfo.as_ref() { + if let Some(sinfo) = ext_sinfo.as_ref() { n_exts -= 1; self.write(&mut *writer, (sinfo, n_exts != 0))?; } #[cfg(feature = "shared-memory")] - if let Some(eshm) = x.ext_shm.as_ref() { + if let Some(eshm) = ext_shm.as_ref() { n_exts -= 1; self.write(&mut *writer, (eshm, n_exts != 0))?; } - for u in x.ext_unknown.iter() { + if let Some(att) = ext_attachment.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (att, n_exts != 0))?; + } + for u in ext_unknown.iter() { n_exts -= 1; self.write(&mut *writer, (u, n_exts != 0))?; } @@ -82,14 +99,14 @@ where // Payload #[cfg(feature = "shared-memory")] { - let codec = Zenoh080Sliced::::new(x.ext_shm.is_some()); - codec.write(&mut *writer, &x.payload)?; + let codec = Zenoh080Sliced::::new(ext_shm.is_some()); + codec.write(&mut *writer, payload)?; } #[cfg(not(feature = "shared-memory"))] { let bodec = Zenoh080Bounded::::new(); - bodec.write(&mut *writer, &x.payload)?; + bodec.write(&mut *writer, payload)?; } Ok(()) @@ -135,6 +152,7 @@ where let mut ext_sinfo: Option = None; #[cfg(feature = "shared-memory")] let mut ext_shm: Option = None; + let mut ext_attachment: Option = None; let mut ext_unknown = Vec::new(); let mut has_ext = imsg::has_flag(self.header, flag::Z); @@ -153,6 +171,11 @@ where ext_shm = Some(s); has_ext = ext; } + ext::Attachment::ID => { + let (a, ext): (ext::AttachmentType, bool) = eodec.read(&mut *reader)?; + ext_attachment = Some(a); + has_ext = ext; + } _ => { let (u, ext) = extension::read(reader, "Put", ext)?; ext_unknown.push(u); @@ -182,6 +205,7 @@ where ext_sinfo, #[cfg(feature = "shared-memory")] ext_shm, + ext_attachment, ext_unknown, payload, }) diff --git a/commons/zenoh-codec/src/zenoh/query.rs b/commons/zenoh-codec/src/zenoh/query.rs index 0844e16df4..09b01b2266 100644 --- a/commons/zenoh-codec/src/zenoh/query.rs +++ b/commons/zenoh-codec/src/zenoh/query.rs @@ -74,39 +74,53 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Query) -> Self::Output { + let Query { + parameters, + ext_sinfo, + ext_consolidation, + ext_body, + ext_attachment, + ext_unknown, + } = x; + // Header let mut header = id::QUERY; - if !x.parameters.is_empty() { + if !parameters.is_empty() { header |= flag::P; } - let mut n_exts = (x.ext_sinfo.is_some() as u8) - + ((x.ext_consolidation != ext::ConsolidationType::default()) as u8) - + (x.ext_body.is_some() as u8) - + (x.ext_unknown.len() as u8); + let mut n_exts = (ext_sinfo.is_some() as u8) + + ((ext_consolidation != &ext::ConsolidationType::default()) as u8) + + (ext_body.is_some() as u8) + + (ext_attachment.is_some() as u8) + + (ext_unknown.len() as u8); if n_exts != 0 { header |= flag::Z; } self.write(&mut *writer, header)?; // Body - if !x.parameters.is_empty() { - self.write(&mut *writer, &x.parameters)?; + if !parameters.is_empty() { + self.write(&mut *writer, parameters)?; } // Extensions - if let Some(sinfo) = x.ext_sinfo.as_ref() { + if let Some(sinfo) = ext_sinfo.as_ref() { n_exts -= 1; self.write(&mut *writer, (sinfo, n_exts != 0))?; } - if x.ext_consolidation != ext::ConsolidationType::default() { + if ext_consolidation != &ext::ConsolidationType::default() { n_exts -= 1; - self.write(&mut *writer, (x.ext_consolidation, n_exts != 0))?; + self.write(&mut *writer, (*ext_consolidation, n_exts != 0))?; } - if let Some(body) = x.ext_body.as_ref() { + if let Some(body) = ext_body.as_ref() { n_exts -= 1; self.write(&mut *writer, (body, n_exts != 0))?; } - for u in x.ext_unknown.iter() { + if let Some(att) = ext_attachment.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (att, n_exts != 0))?; + } + for u in ext_unknown.iter() { n_exts -= 1; self.write(&mut *writer, (u, n_exts != 0))?; } @@ -149,6 +163,7 @@ where let mut ext_sinfo: Option = None; let mut ext_consolidation = ext::ConsolidationType::default(); let mut ext_body: Option = None; + let mut ext_attachment: Option = None; let mut ext_unknown = Vec::new(); let mut has_ext = imsg::has_flag(self.header, flag::Z); @@ -171,6 +186,11 @@ where ext_body = Some(s); has_ext = ext; } + ext::Attachment::ID => { + let (a, ext): (ext::AttachmentType, bool) = eodec.read(&mut *reader)?; + ext_attachment = Some(a); + has_ext = ext; + } _ => { let (u, ext) = extension::read(reader, "Query", ext)?; ext_unknown.push(u); @@ -184,6 +204,7 @@ where ext_sinfo, ext_consolidation, ext_body, + ext_attachment, ext_unknown, }) } diff --git a/commons/zenoh-codec/src/zenoh/reply.rs b/commons/zenoh-codec/src/zenoh/reply.rs index 1aef954220..d98c72b341 100644 --- a/commons/zenoh-codec/src/zenoh/reply.rs +++ b/commons/zenoh-codec/src/zenoh/reply.rs @@ -38,20 +38,33 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &Reply) -> Self::Output { + let Reply { + timestamp, + encoding, + ext_sinfo, + ext_consolidation, + #[cfg(feature = "shared-memory")] + ext_shm, + ext_attachment, + ext_unknown, + payload, + } = x; + // Header let mut header = id::REPLY; - if x.timestamp.is_some() { + if timestamp.is_some() { header |= flag::T; } - if x.encoding != Encoding::default() { + if encoding != &Encoding::default() { header |= flag::E; } - let mut n_exts = (x.ext_sinfo.is_some()) as u8 - + ((x.ext_consolidation != ext::ConsolidationType::default()) as u8) - + (x.ext_unknown.len() as u8); + let mut n_exts = (ext_sinfo.is_some()) as u8 + + ((ext_consolidation != &ext::ConsolidationType::default()) as u8) + + (ext_attachment.is_some()) as u8 + + (ext_unknown.len() as u8); #[cfg(feature = "shared-memory")] { - n_exts += x.ext_shm.is_some() as u8; + n_exts += ext_shm.is_some() as u8; } if n_exts != 0 { header |= flag::Z; @@ -59,28 +72,32 @@ where self.write(&mut *writer, header)?; // Body - if let Some(ts) = x.timestamp.as_ref() { + if let Some(ts) = timestamp.as_ref() { self.write(&mut *writer, ts)?; } - if x.encoding != Encoding::default() { - self.write(&mut *writer, &x.encoding)?; + if encoding != &Encoding::default() { + self.write(&mut *writer, encoding)?; } // Extensions - if let Some(sinfo) = x.ext_sinfo.as_ref() { + if let Some(sinfo) = ext_sinfo.as_ref() { n_exts -= 1; self.write(&mut *writer, (sinfo, n_exts != 0))?; } - if x.ext_consolidation != ext::ConsolidationType::default() { + if ext_consolidation != &ext::ConsolidationType::default() { n_exts -= 1; - self.write(&mut *writer, (x.ext_consolidation, n_exts != 0))?; + self.write(&mut *writer, (*ext_consolidation, n_exts != 0))?; } #[cfg(feature = "shared-memory")] - if let Some(eshm) = x.ext_shm.as_ref() { + if let Some(eshm) = ext_shm.as_ref() { n_exts -= 1; self.write(&mut *writer, (eshm, n_exts != 0))?; } - for u in x.ext_unknown.iter() { + if let Some(att) = ext_attachment.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (att, n_exts != 0))?; + } + for u in ext_unknown.iter() { n_exts -= 1; self.write(&mut *writer, (u, n_exts != 0))?; } @@ -88,14 +105,14 @@ where // Payload #[cfg(feature = "shared-memory")] { - let codec = Zenoh080Sliced::::new(x.ext_shm.is_some()); - codec.write(&mut *writer, &x.payload)?; + let codec = Zenoh080Sliced::::new(ext_shm.is_some()); + codec.write(&mut *writer, payload)?; } #[cfg(not(feature = "shared-memory"))] { let bodec = Zenoh080Bounded::::new(); - bodec.write(&mut *writer, &x.payload)?; + bodec.write(&mut *writer, payload)?; } Ok(()) @@ -142,6 +159,7 @@ where let mut ext_consolidation = ext::ConsolidationType::default(); #[cfg(feature = "shared-memory")] let mut ext_shm: Option = None; + let mut ext_attachment: Option = None; let mut ext_unknown = Vec::new(); let mut has_ext = imsg::has_flag(self.header, flag::Z); @@ -165,6 +183,11 @@ where ext_shm = Some(s); has_ext = ext; } + ext::Attachment::ID => { + let (a, ext): (ext::AttachmentType, bool) = eodec.read(&mut *reader)?; + ext_attachment = Some(a); + has_ext = ext; + } _ => { let (u, ext) = extension::read(reader, "Reply", ext)?; ext_unknown.push(u); @@ -195,6 +218,7 @@ where ext_consolidation, #[cfg(feature = "shared-memory")] ext_shm, + ext_attachment, ext_unknown, payload, }) diff --git a/commons/zenoh-collections/src/single_or_vec.rs b/commons/zenoh-collections/src/single_or_vec.rs index ea190395fb..0490a66a71 100644 --- a/commons/zenoh-collections/src/single_or_vec.rs +++ b/commons/zenoh-collections/src/single_or_vec.rs @@ -11,11 +11,12 @@ // Contributors: // ZettaScale Zenoh Team, // + use alloc::{vec, vec::Vec}; use core::{ cmp::PartialEq, fmt, iter, - ops::{Index, IndexMut}, + ops::{Index, IndexMut, RangeBounds}, ptr, slice, }; @@ -112,6 +113,19 @@ impl SingleOrVec { matches!(&self.0, SingleOrVecInner::Vec(v) if v.is_empty()) } + fn vectorize(&mut self) -> &mut Vec { + if let SingleOrVecInner::Single(v) = &self.0 { + unsafe { + let v = core::ptr::read(v); + core::ptr::write(&mut self.0, SingleOrVecInner::Vec(vec![v])) + }; + } + let SingleOrVecInner::Vec(v) = &mut self.0 else { + unsafe { core::hint::unreachable_unchecked() } + }; + v + } + pub fn get(&self, index: usize) -> Option<&T> { match &self.0 { SingleOrVecInner::Single(v) => (index == 0).then_some(v), @@ -139,6 +153,55 @@ impl SingleOrVec { SingleOrVecInner::Vec(v) => v.last_mut(), } } + pub fn drain>(&mut self, range: Range) -> Drain { + match &mut self.0 { + this @ SingleOrVecInner::Single(_) if range.contains(&0) => Drain { + inner: DrainInner::Single(this), + }, + SingleOrVecInner::Vec(vec) => Drain { + inner: DrainInner::Vec(vec.drain(range)), + }, + _ => Drain { + inner: DrainInner::Done, + }, + } + } + pub fn insert(&mut self, at: usize, value: T) { + assert!(at <= self.len()); + self.vectorize().insert(at, value); + } +} +enum DrainInner<'a, T> { + Vec(alloc::vec::Drain<'a, T>), + Single(&'a mut SingleOrVecInner), + Done, +} +pub struct Drain<'a, T> { + inner: DrainInner<'a, T>, +} +impl<'a, T> Iterator for Drain<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { + match &mut self.inner { + DrainInner::Vec(drain) => drain.next(), + DrainInner::Single(inner) => match unsafe { core::ptr::read(*inner) } { + SingleOrVecInner::Single(value) => unsafe { + core::ptr::write(*inner, SingleOrVecInner::Vec(Vec::new())); + Some(value) + }, + SingleOrVecInner::Vec(_) => None, + }, + _ => None, + } + } +} +impl<'a, T> Drop for Drain<'a, T> { + fn drop(&mut self) { + if let DrainInner::Single(_) = self.inner { + self.next(); + } + } } impl Default for SingleOrVec { diff --git a/commons/zenoh-crypto/src/cipher.rs b/commons/zenoh-crypto/src/cipher.rs index 0345805423..3d12712e56 100644 --- a/commons/zenoh-crypto/src/cipher.rs +++ b/commons/zenoh-crypto/src/cipher.rs @@ -50,7 +50,7 @@ impl BlockCipher { pub fn decrypt(&self, mut bytes: Vec) -> ZResult> { if bytes.len() % Self::BLOCK_SIZE != 0 { - bail!("Invalid bytes lenght to decode: {}", bytes.len()); + bail!("Invalid bytes length to decode: {}", bytes.len()); } let mut start: usize = 0; diff --git a/commons/zenoh-protocol/src/lib.rs b/commons/zenoh-protocol/src/lib.rs index a18aeb766f..2e1a2fa7cf 100644 --- a/commons/zenoh-protocol/src/lib.rs +++ b/commons/zenoh-protocol/src/lib.rs @@ -48,7 +48,7 @@ pub const VERSION: u8 = 0x08; // # Variable length field // // The field size depends on the element definition and/or actual encoding. An example of variable -// lenght element is an array of bytes (e.g., a payload or a string). +// length element is an array of bytes (e.g., a payload or a string). // // ```text // 7 6 5 4 3 2 1 0 @@ -60,7 +60,7 @@ pub const VERSION: u8 = 0x08; // // # u64 field // -// A u64 is a specialized variable lenght field that is used to encode an unsigned integer. +// A u64 is a specialized variable length field that is used to encode an unsigned integer. // // ```text // 7 6 5 4 3 2 1 0 diff --git a/commons/zenoh-protocol/src/scouting/scout.rs b/commons/zenoh-protocol/src/scouting/scout.rs index 8cdb47d3cf..b7a51642df 100644 --- a/commons/zenoh-protocol/src/scouting/scout.rs +++ b/commons/zenoh-protocol/src/scouting/scout.rs @@ -56,7 +56,7 @@ use crate::core::{whatami::WhatAmIMatcher, ZenohId}; /// +---------------+ /// /// (#) ZID length. If Flag(I)==1 it indicates how many bytes are used for the ZenohID bytes. -/// A ZenohID is minimum 1 byte and maximum 16 bytes. Therefore, the actual lenght is computed as: +/// A ZenohID is minimum 1 byte and maximum 16 bytes. Therefore, the actual length is computed as: /// real_zid_len := 1 + zid_len /// /// (*) What. It indicates a bitmap of WhatAmI interests. diff --git a/commons/zenoh-protocol/src/transport/init.rs b/commons/zenoh-protocol/src/transport/init.rs index 0c60dd8a90..1327288471 100644 --- a/commons/zenoh-protocol/src/transport/init.rs +++ b/commons/zenoh-protocol/src/transport/init.rs @@ -76,7 +76,7 @@ use zenoh_buffers::ZSlice; /// - 0b11: Reserved /// /// (#) ZID length. It indicates how many bytes are used for the ZenohID bytes. -/// A ZenohID is minimum 1 byte and maximum 16 bytes. Therefore, the actual lenght is computed as: +/// A ZenohID is minimum 1 byte and maximum 16 bytes. Therefore, the actual length is computed as: /// real_zid_len := 1 + zid_len /// /// (+) Sequence Number/ID resolution. It indicates the resolution and consequently the wire overhead diff --git a/commons/zenoh-protocol/src/transport/join.rs b/commons/zenoh-protocol/src/transport/join.rs index 00920c17ee..c5fbb98430 100644 --- a/commons/zenoh-protocol/src/transport/join.rs +++ b/commons/zenoh-protocol/src/transport/join.rs @@ -74,7 +74,7 @@ use core::time::Duration; /// - 0b11: Reserved /// /// (#) ZID length. It indicates how many bytes are used for the ZenohID bytes. -/// A ZenohID is minimum 1 byte and maximum 16 bytes. Therefore, the actual lenght is computed as: +/// A ZenohID is minimum 1 byte and maximum 16 bytes. Therefore, the actual length is computed as: /// real_zid_len := 1 + zid_len /// /// (+) Sequence Number/ID resolution. It indicates the resolution and consequently the wire overhead diff --git a/commons/zenoh-protocol/src/zenoh/del.rs b/commons/zenoh-protocol/src/zenoh/del.rs index 0de867ce51..84fec5bc08 100644 --- a/commons/zenoh-protocol/src/zenoh/del.rs +++ b/commons/zenoh-protocol/src/zenoh/del.rs @@ -42,6 +42,7 @@ pub mod flag { pub struct Del { pub timestamp: Option, pub ext_sinfo: Option, + pub ext_attachment: Option, pub ext_unknown: Vec, } @@ -52,6 +53,10 @@ pub mod ext { /// Used to carry additional information about the source of data pub type SourceInfo = zextzbuf!(0x1, false); pub type SourceInfoType = crate::zenoh::ext::SourceInfoType<{ SourceInfo::ID }>; + + /// # User attachment + pub type Attachment = zextzbuf!(0x2, false); + pub type AttachmentType = crate::zenoh::ext::AttachmentType<{ Attachment::ID }>; } impl Del { @@ -67,10 +72,11 @@ impl Del { Timestamp::new(time, id) }); let ext_sinfo = rng.gen_bool(0.5).then_some(ext::SourceInfoType::rand()); + let ext_attachment = rng.gen_bool(0.5).then_some(ext::AttachmentType::rand()); let mut ext_unknown = Vec::new(); for _ in 0..rng.gen_range(0..4) { ext_unknown.push(ZExtUnknown::rand2( - iext::mid(ext::SourceInfo::ID) + 1, + iext::mid(ext::Attachment::ID) + 1, false, )); } @@ -78,6 +84,7 @@ impl Del { Self { timestamp, ext_sinfo, + ext_attachment, ext_unknown, } } diff --git a/commons/zenoh-protocol/src/zenoh/mod.rs b/commons/zenoh-protocol/src/zenoh/mod.rs index 740c7e8b0d..e67576e673 100644 --- a/commons/zenoh-protocol/src/zenoh/mod.rs +++ b/commons/zenoh-protocol/src/zenoh/mod.rs @@ -256,4 +256,32 @@ pub mod ext { } } } + + /// ```text + /// 7 6 5 4 3 2 1 0 + /// +-+-+-+-+-+-+-+-+ + /// % num elems % + /// +-------+-+-+---+ + /// ~ key: ~ + /// +---------------+ + /// ~ val: ~ + /// +---------------+ + /// ... -- N times (key, value) tuples + /// ``` + #[derive(Debug, Clone, PartialEq, Eq)] + pub struct AttachmentType { + pub buffer: ZBuf, + } + + impl AttachmentType<{ ID }> { + #[cfg(feature = "test")] + pub fn rand() -> Self { + use rand::Rng; + let mut rng = rand::thread_rng(); + + Self { + buffer: ZBuf::rand(rng.gen_range(3..=1_024)), + } + } + } } diff --git a/commons/zenoh-protocol/src/zenoh/put.rs b/commons/zenoh-protocol/src/zenoh/put.rs index 30b8ef837a..14674e9ad9 100644 --- a/commons/zenoh-protocol/src/zenoh/put.rs +++ b/commons/zenoh-protocol/src/zenoh/put.rs @@ -48,6 +48,7 @@ pub struct Put { pub timestamp: Option, pub encoding: Encoding, pub ext_sinfo: Option, + pub ext_attachment: Option, #[cfg(feature = "shared-memory")] pub ext_shm: Option, pub ext_unknown: Vec, @@ -70,6 +71,10 @@ pub mod ext { pub type Shm = zextunit!(0x2, true); #[cfg(feature = "shared-memory")] pub type ShmType = crate::zenoh::ext::ShmType<{ Shm::ID }>; + + /// # User attachment + pub type Attachment = zextzbuf!(0x3, false); + pub type AttachmentType = crate::zenoh::ext::AttachmentType<{ Attachment::ID }>; } impl Put { @@ -88,10 +93,11 @@ impl Put { let ext_sinfo = rng.gen_bool(0.5).then_some(ext::SourceInfoType::rand()); #[cfg(feature = "shared-memory")] let ext_shm = rng.gen_bool(0.5).then_some(ext::ShmType::rand()); + let ext_attachment = rng.gen_bool(0.5).then_some(ext::AttachmentType::rand()); let mut ext_unknown = Vec::new(); for _ in 0..rng.gen_range(0..4) { ext_unknown.push(ZExtUnknown::rand2( - iext::mid(ext::SourceInfo::ID) + 1, + iext::mid(ext::Attachment::ID) + 1, false, )); } @@ -103,6 +109,7 @@ impl Put { ext_sinfo, #[cfg(feature = "shared-memory")] ext_shm, + ext_attachment, ext_unknown, payload, } diff --git a/commons/zenoh-protocol/src/zenoh/query.rs b/commons/zenoh-protocol/src/zenoh/query.rs index 17a2aa1d59..7432840492 100644 --- a/commons/zenoh-protocol/src/zenoh/query.rs +++ b/commons/zenoh-protocol/src/zenoh/query.rs @@ -94,6 +94,7 @@ pub struct Query { pub ext_sinfo: Option, pub ext_consolidation: Consolidation, pub ext_body: Option, + pub ext_attachment: Option, pub ext_unknown: Vec, } @@ -117,6 +118,10 @@ pub mod ext { /// Shared Memory extension is automatically defined by ValueType extension if /// #[cfg(feature = "shared-memory")] is defined. pub type QueryBodyType = crate::zenoh::ext::ValueType<{ ZExtZBuf::<0x03>::id(false) }, 0x04>; + + /// # User attachment + pub type Attachment = zextzbuf!(0x5, false); + pub type AttachmentType = crate::zenoh::ext::AttachmentType<{ Attachment::ID }>; } impl Query { @@ -141,10 +146,11 @@ impl Query { let ext_sinfo = rng.gen_bool(0.5).then_some(ext::SourceInfoType::rand()); let ext_consolidation = Consolidation::rand(); let ext_body = rng.gen_bool(0.5).then_some(ext::QueryBodyType::rand()); + let ext_attachment = rng.gen_bool(0.5).then_some(ext::AttachmentType::rand()); let mut ext_unknown = Vec::new(); for _ in 0..rng.gen_range(0..4) { ext_unknown.push(ZExtUnknown::rand2( - iext::mid(ext::QueryBodyType::SID) + 1, + iext::mid(ext::Attachment::ID) + 1, false, )); } @@ -154,6 +160,7 @@ impl Query { ext_sinfo, ext_consolidation, ext_body, + ext_attachment, ext_unknown, } } diff --git a/commons/zenoh-protocol/src/zenoh/reply.rs b/commons/zenoh-protocol/src/zenoh/reply.rs index d6b65f88c0..2395e1e9b2 100644 --- a/commons/zenoh-protocol/src/zenoh/reply.rs +++ b/commons/zenoh-protocol/src/zenoh/reply.rs @@ -51,6 +51,7 @@ pub struct Reply { pub ext_consolidation: ext::ConsolidationType, #[cfg(feature = "shared-memory")] pub ext_shm: Option, + pub ext_attachment: Option, pub ext_unknown: Vec, pub payload: ZBuf, } @@ -78,6 +79,10 @@ pub mod ext { pub type Shm = zextunit!(0x3, true); #[cfg(feature = "shared-memory")] pub type ShmType = crate::zenoh::ext::ShmType<{ Shm::ID }>; + + /// # User attachment + pub type Attachment = zextzbuf!(0x4, false); + pub type AttachmentType = crate::zenoh::ext::AttachmentType<{ Attachment::ID }>; } impl Reply { @@ -97,10 +102,11 @@ impl Reply { let ext_consolidation = Consolidation::rand(); #[cfg(feature = "shared-memory")] let ext_shm = rng.gen_bool(0.5).then_some(ext::ShmType::rand()); + let ext_attachment = rng.gen_bool(0.5).then_some(ext::AttachmentType::rand()); let mut ext_unknown = Vec::new(); for _ in 0..rng.gen_range(0..4) { ext_unknown.push(ZExtUnknown::rand2( - iext::mid(ext::Consolidation::ID) + 1, + iext::mid(ext::Attachment::ID) + 1, false, )); } @@ -113,6 +119,7 @@ impl Reply { ext_consolidation, #[cfg(feature = "shared-memory")] ext_shm, + ext_attachment, ext_unknown, payload, } diff --git a/commons/zenoh-shm/src/lib.rs b/commons/zenoh-shm/src/lib.rs index 61a7ea9be3..33409ce20a 100644 --- a/commons/zenoh-shm/src/lib.rs +++ b/commons/zenoh-shm/src/lib.rs @@ -65,7 +65,6 @@ impl PartialEq for Chunk { /// Informations about a [`SharedMemoryBuf`]. /// /// This that can be serialized and can be used to retrieve the [`SharedMemoryBuf`] in a remote process. -#[non_exhaustive] #[derive(Clone, Debug, PartialEq, Eq)] pub struct SharedMemoryBufInfo { /// The index of the beginning of the buffer in the shm segment. diff --git a/examples/examples/z_pub.rs b/examples/examples/z_pub.rs index aebca309ad..097b686de9 100644 --- a/examples/examples/z_pub.rs +++ b/examples/examples/z_pub.rs @@ -23,7 +23,7 @@ async fn main() { // Initiate logging env_logger::init(); - let (config, key_expr, value) = parse_args(); + let (config, key_expr, value, attachment) = parse_args(); println!("Opening session..."); let session = zenoh::open(config).res().await.unwrap(); @@ -35,7 +35,16 @@ async fn main() { sleep(Duration::from_secs(1)).await; let buf = format!("[{idx:4}] {value}"); println!("Putting Data ('{}': '{}')...", &key_expr, buf); - publisher.put(buf).res().await.unwrap(); + let mut put = publisher.put(buf); + if let Some(attachment) = &attachment { + put = put.with_attachment( + attachment + .split('&') + .map(|pair| pair.as_bytes().split_at(pair.find('=').unwrap_or(0))) + .collect(), + ) + } + put.res().await.unwrap(); } } @@ -47,11 +56,16 @@ struct Args { #[arg(short, long, default_value = "Pub from Rust!")] /// The value to write. value: String, + #[arg(short, long)] + /// The attachments to add to each put. + /// + /// The key-value pairs are &-separated, and = serves as the separator between key and value. + attach: Option, #[command(flatten)] common: CommonArgs, } -fn parse_args() -> (Config, KeyExpr<'static>, String) { +fn parse_args() -> (Config, KeyExpr<'static>, String, Option) { let args = Args::parse(); - (args.common.into(), args.key, args.value) + (args.common.into(), args.key, args.value, args.attach) } diff --git a/examples/examples/z_pub_thr.rs b/examples/examples/z_pub_thr.rs index 433444b8de..3e130e0608 100644 --- a/examples/examples/z_pub_thr.rs +++ b/examples/examples/z_pub_thr.rs @@ -11,9 +11,9 @@ // Contributors: // ZettaScale Zenoh Team, // + use clap::Parser; use std::convert::TryInto; -use zenoh::config::Config; use zenoh::prelude::sync::*; use zenoh::publication::CongestionControl; use zenoh_examples::CommonArgs; @@ -21,14 +21,21 @@ use zenoh_examples::CommonArgs; fn main() { // initiate logging env_logger::init(); - let (config, size, prio, print, number) = parse_args(); + let args = Args::parse(); + + let mut prio = Priority::default(); + if let Some(p) = args.priority { + prio = p.try_into().unwrap(); + } + + let payload_size = args.payload_size; - let data: Value = (0usize..size) + let data: Value = (0..payload_size) .map(|i| (i % 10) as u8) .collect::>() .into(); - let session = zenoh::open(config).res().unwrap(); + let session = zenoh::open(args.common).res().unwrap(); let publisher = session .declare_publisher("test/thr") @@ -42,8 +49,8 @@ fn main() { loop { publisher.put(data.clone()).res().unwrap(); - if print { - if count < number { + if args.print { + if count < args.number { count += 1; } else { let thpt = count as f64 / start.elapsed().as_secs_f64(); @@ -57,34 +64,17 @@ fn main() { #[derive(Parser, Clone, PartialEq, Eq, Hash, Debug)] struct Args { - #[arg(short, long)] /// Priority for sending data + #[arg(short, long)] priority: Option, - #[arg(short = 't', long)] /// Print the statistics + #[arg(short = 't', long)] print: bool, - #[arg(short, long, default_value = "100000")] /// Number of messages in each throughput measurements + #[arg(short, long, default_value = "100000")] number: usize, /// Sets the size of the payload to publish payload_size: usize, #[command(flatten)] common: CommonArgs, } - -fn parse_args() -> (Config, usize, Priority, bool, usize) { - let args = Args::parse(); - - let mut prio = Priority::default(); - if let Some(p) = args.priority { - prio = p.try_into().unwrap(); - } - - ( - args.common.into(), - args.payload_size, - prio, - args.print, - args.number, - ) -} diff --git a/examples/src/lib.rs b/examples/src/lib.rs index a766bd0695..255ac01917 100644 --- a/examples/src/lib.rs +++ b/examples/src/lib.rs @@ -4,9 +4,8 @@ //! use zenoh::config::Config; -#[derive(clap::ValueEnum, Default, Clone, Copy, PartialEq, Eq, Hash, Debug)] +#[derive(clap::ValueEnum, Clone, Copy, PartialEq, Eq, Hash, Debug)] pub enum Wai { - #[default] Peer, Client, Router, diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 488e357236..4139a65a05 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -583,6 +583,7 @@ mod tests { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], payload: ZBuf::from(vec![0u8; 8]), }), diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 256dfbef47..2e3af61d64 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -758,6 +758,7 @@ mod tests { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], payload, }), @@ -887,6 +888,7 @@ mod tests { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], payload, }), @@ -998,6 +1000,7 @@ mod tests { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], payload, }), diff --git a/io/zenoh-transport/tests/multicast_compression.rs b/io/zenoh-transport/tests/multicast_compression.rs index fafb28e642..f8e56a5484 100644 --- a/io/zenoh-transport/tests/multicast_compression.rs +++ b/io/zenoh-transport/tests/multicast_compression.rs @@ -277,6 +277,7 @@ mod tests { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], } .into(), diff --git a/io/zenoh-transport/tests/multicast_transport.rs b/io/zenoh-transport/tests/multicast_transport.rs index 0822d08f58..ebb290af1e 100644 --- a/io/zenoh-transport/tests/multicast_transport.rs +++ b/io/zenoh-transport/tests/multicast_transport.rs @@ -273,6 +273,7 @@ mod tests { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], } .into(), diff --git a/io/zenoh-transport/tests/unicast_compression.rs b/io/zenoh-transport/tests/unicast_compression.rs index be979fef23..323c6f529e 100644 --- a/io/zenoh-transport/tests/unicast_compression.rs +++ b/io/zenoh-transport/tests/unicast_compression.rs @@ -305,6 +305,7 @@ mod tests { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], } .into(), diff --git a/io/zenoh-transport/tests/unicast_concurrent.rs b/io/zenoh-transport/tests/unicast_concurrent.rs index 64516f6f26..d13f763b68 100644 --- a/io/zenoh-transport/tests/unicast_concurrent.rs +++ b/io/zenoh-transport/tests/unicast_concurrent.rs @@ -204,6 +204,7 @@ async fn transport_concurrent(endpoint01: Vec, endpoint02: Vec, endpoint02: Vec, client_transport: TransportUn ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], } .into(), diff --git a/io/zenoh-transport/tests/unicast_shm.rs b/io/zenoh-transport/tests/unicast_shm.rs index e27acfe3c3..f9180849af 100644 --- a/io/zenoh-transport/tests/unicast_shm.rs +++ b/io/zenoh-transport/tests/unicast_shm.rs @@ -280,6 +280,7 @@ mod tests { encoding: Encoding::default(), ext_sinfo: None, ext_shm: None, + ext_attachment: None, ext_unknown: vec![], } .into(), @@ -327,6 +328,7 @@ mod tests { encoding: Encoding::default(), ext_sinfo: None, ext_shm: None, + ext_attachment: None, ext_unknown: vec![], } .into(), diff --git a/io/zenoh-transport/tests/unicast_simultaneous.rs b/io/zenoh-transport/tests/unicast_simultaneous.rs index dad4b6f775..19380eb49e 100644 --- a/io/zenoh-transport/tests/unicast_simultaneous.rs +++ b/io/zenoh-transport/tests/unicast_simultaneous.rs @@ -86,6 +86,7 @@ mod tests { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], } .into(), diff --git a/io/zenoh-transport/tests/unicast_transport.rs b/io/zenoh-transport/tests/unicast_transport.rs index 9b25bb26c8..11839aef2a 100644 --- a/io/zenoh-transport/tests/unicast_transport.rs +++ b/io/zenoh-transport/tests/unicast_transport.rs @@ -476,6 +476,7 @@ async fn test_transport( ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], } .into(), diff --git a/zenoh/src/admin.rs b/zenoh/src/admin.rs index a8aad9c809..8cdf638af5 100644 --- a/zenoh/src/admin.rs +++ b/zenoh/src/admin.rs @@ -153,6 +153,8 @@ impl TransportMulticastEventHandler for Handler { &expr, Some(info), serde_json::to_vec(&peer).unwrap().into(), + #[cfg(feature = "unstable")] + None, ); Ok(Arc::new(PeerHandler { expr, @@ -200,6 +202,8 @@ impl TransportPeerEventHandler for PeerHandler { .with_suffix(&format!("/link/{}", s.finish())), Some(info), serde_json::to_vec(&link).unwrap().into(), + #[cfg(feature = "unstable")] + None, ); } @@ -218,6 +222,8 @@ impl TransportPeerEventHandler for PeerHandler { .with_suffix(&format!("/link/{}", s.finish())), Some(info), vec![0u8; 0].into(), + #[cfg(feature = "unstable")] + None, ); } @@ -228,8 +234,14 @@ impl TransportPeerEventHandler for PeerHandler { kind: SampleKind::Delete, ..Default::default() }; - self.session - .handle_data(true, &self.expr, Some(info), vec![0u8; 0].into()); + self.session.handle_data( + true, + &self.expr, + Some(info), + vec![0u8; 0].into(), + #[cfg(feature = "unstable")] + None, + ); } fn as_any(&self) -> &dyn std::any::Any { diff --git a/zenoh/src/liveliness.rs b/zenoh/src/liveliness.rs index 1e36cb8f69..a29d4b5d4a 100644 --- a/zenoh/src/liveliness.rs +++ b/zenoh/src/liveliness.rs @@ -749,6 +749,8 @@ where Locality::default(), self.timeout, None, + #[cfg(feature = "unstable")] + None, callback, ) .map(|_| receiver) diff --git a/zenoh/src/net/routing/queries.rs b/zenoh/src/net/routing/queries.rs index 06b81a998b..c2496b5ff8 100644 --- a/zenoh/src/net/routing/queries.rs +++ b/zenoh/src/net/routing/queries.rs @@ -2121,6 +2121,7 @@ pub fn route_query( ext_consolidation: ConsolidationType::default(), #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, // @TODO: expose it in the API ext_unknown: vec![], payload, }); diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 08b00c5047..96ea85f6b4 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -378,6 +378,8 @@ impl Primitives for AdminSpace { qid: msg.id, zid, primitives, + #[cfg(feature = "unstable")] + attachment: query.ext_attachment.map(Into::into), }), }; diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 933a2e46a4..518ec7e551 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -604,6 +604,7 @@ fn client_test() { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], payload: ZBuf::empty(), }), @@ -636,6 +637,7 @@ fn client_test() { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], payload: ZBuf::empty(), }), @@ -668,6 +670,7 @@ fn client_test() { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], payload: ZBuf::empty(), }), @@ -700,6 +703,7 @@ fn client_test() { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], payload: ZBuf::empty(), }), @@ -732,6 +736,7 @@ fn client_test() { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment: None, ext_unknown: vec![], payload: ZBuf::empty(), }), diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index be439b6f2d..8a84e49566 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -19,6 +19,8 @@ use crate::handlers::Callback; use crate::handlers::DefaultHandler; use crate::net::transport::primitives::Primitives; use crate::prelude::*; +#[zenoh_macros::unstable] +use crate::sample::Attachment; use crate::sample::DataInfo; use crate::Encoding; use crate::SessionRef; @@ -78,6 +80,8 @@ pub struct PutBuilder<'a, 'b> { pub(crate) publisher: PublisherBuilder<'a, 'b>, pub(crate) value: Value, pub(crate) kind: SampleKind, + #[cfg(feature = "unstable")] + pub(crate) attachment: Option, } impl PutBuilder<'_, '_> { @@ -117,6 +121,12 @@ impl PutBuilder<'_, '_> { self.kind = kind; self } + + #[zenoh_macros::unstable] + pub fn with_attachment(mut self, attachment: Attachment) -> Self { + self.attachment = Some(attachment); + self + } } impl Resolvable for PutBuilder<'_, '_> { @@ -130,6 +140,8 @@ impl SyncResolve for PutBuilder<'_, '_> { publisher, value, kind, + #[cfg(feature = "unstable")] + attachment, } = self; let key_expr = publisher.key_expr?; log::trace!("write({:?}, [...])", &key_expr); @@ -151,20 +163,42 @@ impl SyncResolve for PutBuilder<'_, '_> { ext_tstamp: None, ext_nodeid: ext::NodeIdType::default(), payload: match kind { - SampleKind::Put => PushBody::Put(Put { - timestamp, - encoding: value.encoding.clone(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_unknown: vec![], - payload: value.payload.clone(), - }), - SampleKind::Delete => PushBody::Del(Del { - timestamp, - ext_sinfo: None, - ext_unknown: vec![], - }), + SampleKind::Put => { + #[allow(unused_mut)] + let mut ext_attachment = None; + #[cfg(feature = "unstable")] + { + if let Some(attachment) = attachment.clone() { + ext_attachment = Some(attachment.into()); + } + } + PushBody::Put(Put { + timestamp, + encoding: value.encoding.clone(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment, + ext_unknown: vec![], + payload: value.payload.clone(), + }) + } + SampleKind::Delete => { + #[allow(unused_mut)] + let mut ext_attachment = None; + #[cfg(feature = "unstable")] + { + if let Some(attachment) = attachment.clone() { + ext_attachment = Some(attachment.into()); + } + } + PushBody::Del(Del { + timestamp, + ext_sinfo: None, + ext_attachment, + ext_unknown: vec![], + }) + } }, }); } @@ -181,6 +215,8 @@ impl SyncResolve for PutBuilder<'_, '_> { &key_expr.to_wire(&publisher.session), Some(data_info), value.payload, + #[cfg(feature = "unstable")] + attachment, ); } Ok(()) @@ -337,6 +373,8 @@ impl<'a> Publisher<'a> { publisher: self, value, kind, + #[cfg(feature = "unstable")] + attachment: None, } } @@ -621,6 +659,16 @@ pub struct Publication<'a> { publisher: &'a Publisher<'a>, value: Value, kind: SampleKind, + #[cfg(feature = "unstable")] + pub(crate) attachment: Option, +} + +impl<'a> Publication<'a> { + #[zenoh_macros::unstable] + pub fn with_attachment(mut self, attachment: Attachment) -> Self { + self.attachment = Some(attachment); + self + } } impl Resolvable for Publication<'_> { @@ -633,6 +681,8 @@ impl SyncResolve for Publication<'_> { publisher, value, kind, + #[cfg(feature = "unstable")] + attachment, } = self; log::trace!("write({:?}, [...])", publisher.key_expr); let primitives = zread!(publisher.session.state) @@ -643,6 +693,14 @@ impl SyncResolve for Publication<'_> { let timestamp = publisher.session.runtime.new_timestamp(); if publisher.destination != Locality::SessionLocal { + #[allow(unused_mut)] + let mut ext_attachment = None; + #[cfg(feature = "unstable")] + { + if let Some(attachment) = attachment.clone() { + ext_attachment = Some(attachment.into()); + } + } primitives.send_push(Push { wire_expr: publisher.key_expr.to_wire(&publisher.session).to_owned(), ext_qos: ext::QoSType::new( @@ -658,6 +716,7 @@ impl SyncResolve for Publication<'_> { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment, ext_unknown: vec![], payload: value.payload.clone(), }), @@ -675,6 +734,8 @@ impl SyncResolve for Publication<'_> { &publisher.key_expr.to_wire(&publisher.session), Some(data_info), value.payload, + #[cfg(feature = "unstable")] + attachment, ); } Ok(()) diff --git a/zenoh/src/query.rs b/zenoh/src/query.rs index 18cb7e882e..c4f3fb35e9 100644 --- a/zenoh/src/query.rs +++ b/zenoh/src/query.rs @@ -16,6 +16,8 @@ use crate::handlers::{locked, Callback, DefaultHandler}; use crate::prelude::*; +#[zenoh_macros::unstable] +use crate::sample::Attachment; use crate::Session; use std::collections::HashMap; use std::future::Ready; @@ -126,6 +128,8 @@ pub struct GetBuilder<'a, 'b, Handler> { pub(crate) timeout: Duration, pub(crate) handler: Handler, pub(crate) value: Option, + #[cfg(feature = "unstable")] + pub(crate) attachment: Option, } impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> { @@ -159,6 +163,8 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> { destination, timeout, value, + #[cfg(feature = "unstable")] + attachment, handler: _, } = self; GetBuilder { @@ -170,6 +176,8 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> { destination, timeout, value, + #[cfg(feature = "unstable")] + attachment, handler: callback, } } @@ -238,6 +246,8 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> { destination, timeout, value, + #[cfg(feature = "unstable")] + attachment, handler: _, } = self; GetBuilder { @@ -249,6 +259,8 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> { destination, timeout, value, + #[cfg(feature = "unstable")] + attachment, handler, } } @@ -294,6 +306,12 @@ impl<'a, 'b, Handler> GetBuilder<'a, 'b, Handler> { self } + #[zenoh_macros::unstable] + pub fn with_attachment(mut self, attachment: Attachment) -> Self { + self.attachment = Some(attachment); + self + } + /// By default, `get` guarantees that it will only receive replies whose key expressions intersect /// with the queried key expression. /// @@ -310,6 +328,7 @@ impl<'a, 'b, Handler> GetBuilder<'a, 'b, Handler> { destination, timeout, value, + attachment, handler, } = self; Self { @@ -321,6 +340,7 @@ impl<'a, 'b, Handler> GetBuilder<'a, 'b, Handler> { destination, timeout, value, + attachment, handler, } } @@ -369,6 +389,8 @@ where self.destination, self.timeout, self.value, + #[cfg(feature = "unstable")] + self.attachment, callback, ) .map(|_| receiver) diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 4881de6ec1..914684f76f 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -18,6 +18,9 @@ use crate::handlers::{locked, DefaultHandler}; use crate::prelude::*; #[zenoh_macros::unstable] use crate::query::ReplyKeyExpr; +#[zenoh_macros::unstable] +use crate::sample::Attachment; +use crate::sample::DataInfo; use crate::SessionRef; use crate::Undeclarable; @@ -45,6 +48,8 @@ pub(crate) struct QueryInner { pub(crate) qid: RequestId, pub(crate) zid: ZenohId, pub(crate) primitives: Arc, + #[cfg(feature = "unstable")] + pub(crate) attachment: Option, } impl Drop for QueryInner { @@ -91,6 +96,11 @@ impl Query { self.inner.value.as_ref() } + #[zenoh_macros::unstable] + pub fn attachment(&self) -> Option<&Attachment> { + self.inner.attachment.as_ref() + } + /// Sends a reply to this Query. /// /// By default, queries only accept replies whose key expression intersects with the query's. @@ -150,6 +160,20 @@ pub struct ReplyBuilder<'a> { result: Result, } +impl<'a> ReplyBuilder<'a> { + #[allow(clippy::result_large_err)] + #[zenoh_macros::unstable] + pub fn with_attachment(mut self, attachment: Attachment) -> Result { + match &mut self.result { + Ok(sample) => { + sample.attachment = Some(attachment); + Ok(self) + } + Err(_) => Err((self, attachment)), + } + } +} + impl<'a> Resolvable for ReplyBuilder<'a> { type To = ZResult<()>; } @@ -163,7 +187,34 @@ impl SyncResolve for ReplyBuilder<'_> { { bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", sample.key_expr, self.query.key_expr()) } - let (key_expr, payload, data_info) = sample.split(); + let Sample { + key_expr, + value: Value { payload, encoding }, + kind, + timestamp, + #[cfg(feature = "unstable")] + source_info, + #[cfg(feature = "unstable")] + attachment, + } = sample; + #[allow(unused_mut)] + let mut data_info = DataInfo { + kind, + encoding: Some(encoding), + timestamp, + source_id: None, + source_sn: None, + }; + #[allow(unused_mut)] + let mut ext_attachment = None; + #[cfg(feature = "unstable")] + { + data_info.source_id = source_info.source_id; + data_info.source_sn = source_info.source_sn; + if let Some(attachment) = attachment { + ext_attachment = Some(attachment.into()); + } + } self.query.inner.primitives.send_response(Response { rid: self.query.inner.qid, wire_expr: WireExpr { @@ -187,6 +238,7 @@ impl SyncResolve for ReplyBuilder<'_> { ext_consolidation: ConsolidationType::default(), #[cfg(feature = "shared-memory")] ext_shm: None, + ext_attachment, ext_unknown: vec![], payload, }), diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index 1d3c168e40..083e6fced5 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -98,6 +98,221 @@ impl From> for SourceInfo { } } +mod attachment { + #[zenoh_macros::unstable] + use zenoh_buffers::{ + reader::{HasReader, Reader}, + writer::HasWriter, + ZBuf, ZBufReader, ZSlice, + }; + #[zenoh_macros::unstable] + use zenoh_codec::{RCodec, WCodec, Zenoh080}; + #[zenoh_macros::unstable] + use zenoh_protocol::zenoh::ext::AttachmentType; + + /// A builder for [`Attachment`] + #[zenoh_macros::unstable] + pub struct AttachmentBuilder { + pub(crate) inner: Vec, + } + #[zenoh_macros::unstable] + impl Default for AttachmentBuilder { + fn default() -> Self { + Self::new() + } + } + #[zenoh_macros::unstable] + impl AttachmentBuilder { + pub fn new() -> Self { + Self { inner: Vec::new() } + } + fn _insert(&mut self, key: &[u8], value: &[u8]) { + let codec = Zenoh080; + let mut writer = self.inner.writer(); + codec.write(&mut writer, key).unwrap(); // Infallible, barring alloc failure + codec.write(&mut writer, value).unwrap(); // Infallible, barring alloc failure + } + /// Inserts a key-value pair to the attachment. + /// + /// Note that [`Attachment`] is a list of non-unique key-value pairs: inserting at the same key multiple times leads to both values being transmitted for that key. + pub fn insert + ?Sized, Value: AsRef<[u8]> + ?Sized>( + &mut self, + key: &Key, + value: &Value, + ) { + self._insert(key.as_ref(), value.as_ref()) + } + pub fn build(self) -> Attachment { + Attachment { + inner: self.inner.into(), + } + } + } + #[zenoh_macros::unstable] + impl From for Attachment { + fn from(value: AttachmentBuilder) -> Self { + Attachment { + inner: value.inner.into(), + } + } + } + #[zenoh_macros::unstable] + #[derive(Clone)] + pub struct Attachment { + pub(crate) inner: ZBuf, + } + #[zenoh_macros::unstable] + impl Default for Attachment { + fn default() -> Self { + Self::new() + } + } + #[zenoh_macros::unstable] + impl From for AttachmentType { + fn from(this: Attachment) -> Self { + AttachmentType { buffer: this.inner } + } + } + #[zenoh_macros::unstable] + impl From> for Attachment { + fn from(this: AttachmentType) -> Self { + Attachment { inner: this.buffer } + } + } + #[zenoh_macros::unstable] + impl Attachment { + pub fn new() -> Self { + Self { + inner: ZBuf::empty(), + } + } + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + pub fn len(&self) -> usize { + self.iter().count() + } + pub fn iter(&self) -> AttachmentIterator { + self.into_iter() + } + fn _get(&self, key: &[u8]) -> Option { + self.iter() + .find_map(|(k, v)| (k.as_slice() == key).then_some(v)) + } + pub fn get>(&self, key: &Key) -> Option { + self._get(key.as_ref()) + } + fn _insert(&mut self, key: &[u8], value: &[u8]) { + let codec = Zenoh080; + let mut writer = self.inner.writer(); + codec.write(&mut writer, key).unwrap(); // Infallible, barring alloc failure + codec.write(&mut writer, value).unwrap(); // Infallible, barring alloc failure + } + /// Inserts a key-value pair to the attachment. + /// + /// Note that [`Attachment`] is a list of non-unique key-value pairs: inserting at the same key multiple times leads to both values being transmitted for that key. + /// + /// [`Attachment`] is not very efficient at inserting, so if you wish to perform multiple inserts, it's generally better to [`Attachment::extend`] after performing the inserts on an [`AttachmentBuilder`] + pub fn insert + ?Sized, Value: AsRef<[u8]> + ?Sized>( + &mut self, + key: &Key, + value: &Value, + ) { + self._insert(key.as_ref(), value.as_ref()) + } + fn _extend(&mut self, with: Self) -> &mut Self { + for slice in with.inner.zslices().cloned() { + self.inner.push_zslice(slice); + } + self + } + pub fn extend(&mut self, with: impl Into) -> &mut Self { + let with = with.into(); + self._extend(with) + } + } + #[zenoh_macros::unstable] + pub struct AttachmentIterator<'a> { + reader: ZBufReader<'a>, + } + #[zenoh_macros::unstable] + impl<'a> core::iter::IntoIterator for &'a Attachment { + type Item = (ZSlice, ZSlice); + type IntoIter = AttachmentIterator<'a>; + fn into_iter(self) -> Self::IntoIter { + AttachmentIterator { + reader: self.inner.reader(), + } + } + } + #[zenoh_macros::unstable] + impl core::fmt::Debug for Attachment { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{{")?; + for (key, value) in self { + let key = key.as_slice(); + let value = value.as_slice(); + match core::str::from_utf8(key) { + Ok(key) => write!(f, "\"{key}\": ")?, + Err(_) => { + write!(f, "0x")?; + for byte in key { + write!(f, "{byte:02X}")? + } + } + } + match core::str::from_utf8(value) { + Ok(value) => write!(f, "\"{value}\", ")?, + Err(_) => { + write!(f, "0x")?; + for byte in value { + write!(f, "{byte:02X}")? + } + write!(f, ", ")? + } + } + } + write!(f, "}}") + } + } + #[zenoh_macros::unstable] + impl<'a> core::iter::Iterator for AttachmentIterator<'a> { + type Item = (ZSlice, ZSlice); + fn next(&mut self) -> Option { + let key = Zenoh080.read(&mut self.reader).ok()?; + let value = Zenoh080.read(&mut self.reader).ok()?; + Some((key, value)) + } + fn size_hint(&self) -> (usize, Option) { + ( + (self.reader.remaining() != 0) as usize, + Some(self.reader.remaining() / 2), + ) + } + } + #[zenoh_macros::unstable] + impl<'a> core::iter::FromIterator<(&'a [u8], &'a [u8])> for AttachmentBuilder { + fn from_iter>(iter: T) -> Self { + let codec = Zenoh080; + let mut buffer: Vec = Vec::new(); + let mut writer = buffer.writer(); + for (key, value) in iter { + codec.write(&mut writer, key).unwrap(); // Infallible, barring allocation failures + codec.write(&mut writer, value).unwrap(); // Infallible, barring allocation failures + } + Self { inner: buffer } + } + } + #[zenoh_macros::unstable] + impl<'a> core::iter::FromIterator<(&'a [u8], &'a [u8])> for Attachment { + fn from_iter>(iter: T) -> Self { + AttachmentBuilder::from_iter(iter).into() + } + } +} +#[zenoh_macros::unstable] +pub use attachment::{Attachment, AttachmentBuilder, AttachmentIterator}; + /// A zenoh sample. #[non_exhaustive] #[derive(Clone, Debug)] @@ -120,6 +335,16 @@ pub struct Sample { /// /// Infos on the source of this Sample. pub source_info: SourceInfo, + + #[cfg(feature = "unstable")] + ///
+ /// 🔬 + /// This API has been marked as unstable: it works as advertised, but we may change it in a future release. + /// To use it, you must enable zenoh's unstable feature flag. + ///
+ /// + /// A map of key-value pairs, where each key and value are byte-slices. + pub attachment: Option, } impl Sample { @@ -137,6 +362,8 @@ impl Sample { timestamp: None, #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment: None, } } /// Creates a new Sample. @@ -157,6 +384,8 @@ impl Sample { timestamp: None, #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment: None, }) } @@ -179,6 +408,8 @@ impl Sample { timestamp: data_info.timestamp, #[cfg(feature = "unstable")] source_info: data_info.into(), + #[cfg(feature = "unstable")] + attachment: None, } } else { Sample { @@ -188,28 +419,12 @@ impl Sample { timestamp: None, #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment: None, } } } - #[inline] - pub(crate) fn split(self) -> (KeyExpr<'static>, ZBuf, DataInfo) { - let info = DataInfo { - kind: self.kind, - encoding: Some(self.value.encoding), - timestamp: self.timestamp, - #[cfg(feature = "unstable")] - source_id: self.source_info.source_id, - #[cfg(not(feature = "unstable"))] - source_id: None, - #[cfg(feature = "unstable")] - source_sn: self.source_info.source_sn, - #[cfg(not(feature = "unstable"))] - source_sn: None, - }; - (self.key_expr, self.value.payload, info) - } - /// Gets the timestamp of this Sample. #[inline] pub fn get_timestamp(&self) -> Option<&Timestamp> { @@ -244,6 +459,23 @@ impl Sample { self.timestamp.as_ref().unwrap() } } + + #[zenoh_macros::unstable] + pub fn attachment(&self) -> Option<&Attachment> { + self.attachment.as_ref() + } + + #[zenoh_macros::unstable] + pub fn attachment_mut(&mut self) -> &mut Option { + &mut self.attachment + } + + #[allow(clippy::result_large_err)] + #[zenoh_macros::unstable] + pub fn with_attachment(mut self, attachment: Attachment) -> Self { + self.attachment = Some(attachment); + self + } } impl std::ops::Deref for Sample { diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 23369e5790..6609d1361d 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -28,6 +28,8 @@ use crate::prelude::{KeyExpr, Parameters}; use crate::publication::*; use crate::query::*; use crate::queryable::*; +#[cfg(feature = "unstable")] +use crate::sample::Attachment; use crate::sample::DataInfo; use crate::selector::TIME_RANGE_KEY; use crate::subscriber::*; @@ -721,6 +723,8 @@ impl Session { publisher: self.declare_publisher(key_expr), value: value.into(), kind: SampleKind::Put, + #[cfg(feature = "unstable")] + attachment: None, } } @@ -752,6 +756,8 @@ impl Session { publisher: self.declare_publisher(key_expr), value: Value::empty(), kind: SampleKind::Delete, + #[cfg(feature = "unstable")] + attachment: None, } } /// Query data from the matching queryables in the system. @@ -794,6 +800,8 @@ impl Session { destination: Locality::default(), timeout: Duration::from_millis(unwrap_or_default!(conf.queries_default_timeout())), value: None, + #[cfg(feature = "unstable")] + attachment: None, handler: DefaultHandler, } } @@ -1606,6 +1614,7 @@ impl Session { key_expr: &WireExpr, info: Option, payload: ZBuf, + #[cfg(feature = "unstable")] attachment: Option, ) { let mut callbacks = SingleOrVec::default(); let state = zread!(self.state); @@ -1706,10 +1715,22 @@ impl Session { drop(state); let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter(); for (cb, key_expr) in drain { - cb(Sample::with_info(key_expr, payload.clone(), info.clone())); + #[allow(unused_mut)] + let mut sample = Sample::with_info(key_expr, payload.clone(), info.clone()); + #[cfg(feature = "unstable")] + { + sample.attachment = attachment.clone(); + } + cb(sample); } if let Some((cb, key_expr)) = last { - cb(Sample::with_info(key_expr, payload, info)); + #[allow(unused_mut)] + let mut sample = Sample::with_info(key_expr, payload, info); + #[cfg(feature = "unstable")] + { + sample.attachment = attachment; + } + cb(sample); } } @@ -1746,6 +1767,7 @@ impl Session { destination: Locality, timeout: Duration, value: Option, + #[cfg(feature = "unstable")] attachment: Option, callback: Callback<'static, Reply>, ) -> ZResult<()> { log::trace!("get({}, {:?}, {:?})", selector, target, consolidation); @@ -1813,6 +1835,14 @@ impl Session { drop(state); if destination != Locality::SessionLocal { + #[allow(unused_mut)] + let mut ext_attachment = None; + #[cfg(feature = "unstable")] + { + if let Some(attachment) = attachment.clone() { + ext_attachment = Some(attachment.into()); + } + } primitives.send_request(Request { id: qid, wire_expr: wexpr.clone(), @@ -1832,6 +1862,7 @@ impl Session { encoding: v.encoding.clone(), payload: v.payload.clone(), }), + ext_attachment, ext_unknown: vec![], }), }); @@ -1850,6 +1881,8 @@ impl Session { encoding: v.encoding.clone(), payload: v.payload.clone(), }), + #[cfg(feature = "unstable")] + attachment, ); } Ok(()) @@ -1865,6 +1898,7 @@ impl Session { _target: TargetType, _consolidation: ConsolidationType, body: Option, + #[cfg(feature = "unstable")] attachment: Option, ) { let (primitives, key_expr, callbacks) = { let state = zread!(self.state); @@ -1925,6 +1959,8 @@ impl Session { } else { primitives }, + #[cfg(feature = "unstable")] + attachment, }), }; for callback in callbacks.iter() { @@ -2123,7 +2159,14 @@ impl Primitives for Session { .starts_with(crate::liveliness::PREFIX_LIVELINESS) { drop(state); - self.handle_data(false, &m.wire_expr, None, ZBuf::default()); + self.handle_data( + false, + &m.wire_expr, + None, + ZBuf::default(), + #[cfg(feature = "unstable")] + None, + ); } } Err(err) => { @@ -2155,6 +2198,8 @@ impl Primitives for Session { &m.ext_wire_expr.wire_expr, Some(data_info), ZBuf::default(), + #[cfg(feature = "unstable")] + None, ); } } @@ -2189,7 +2234,14 @@ impl Primitives for Session { source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; - self.handle_data(false, &msg.wire_expr, Some(info), m.payload) + self.handle_data( + false, + &msg.wire_expr, + Some(info), + m.payload, + #[cfg(feature = "unstable")] + m.ext_attachment.map(Into::into), + ) } PushBody::Del(m) => { let info = DataInfo { @@ -2199,7 +2251,14 @@ impl Primitives for Session { source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; - self.handle_data(false, &msg.wire_expr, Some(info), ZBuf::empty()) + self.handle_data( + false, + &msg.wire_expr, + Some(info), + ZBuf::empty(), + #[cfg(feature = "unstable")] + m.ext_attachment.map(Into::into), + ) } } } @@ -2215,6 +2274,8 @@ impl Primitives for Session { msg.ext_target, m.ext_consolidation, m.ext_body, + #[cfg(feature = "unstable")] + m.ext_attachment.map(Into::into), ), RequestBody::Put(_) => (), RequestBody::Del(_) => (), @@ -2326,12 +2387,15 @@ impl Primitives for Session { source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; + #[allow(unused_mut)] + let mut sample = + Sample::with_info(key_expr.into_owned(), m.payload, Some(info)); + #[cfg(feature = "unstable")] + { + sample.attachment = m.ext_attachment.map(Into::into); + } let new_reply = Reply { - sample: Ok(Sample::with_info( - key_expr.into_owned(), - m.payload, - Some(info), - )), + sample: Ok(sample), replier_id: ZenohId::rand(), // TODO }; let callback = diff --git a/zenoh/tests/attachments.rs b/zenoh/tests/attachments.rs new file mode 100644 index 0000000000..d1fbd1086a --- /dev/null +++ b/zenoh/tests/attachments.rs @@ -0,0 +1,112 @@ +#[cfg(feature = "unstable")] +#[test] +fn pubsub() { + use zenoh::prelude::sync::*; + + let zenoh = zenoh::open(Config::default()).res().unwrap(); + let _sub = zenoh + .declare_subscriber("test/attachment") + .callback(|sample| { + println!( + "{}", + std::str::from_utf8(&sample.payload.contiguous()).unwrap() + ); + for (k, v) in &sample.attachment.unwrap() { + assert!(k.iter().rev().zip(v.as_slice()).all(|(k, v)| k == v)) + } + }) + .res() + .unwrap(); + let publisher = zenoh.declare_publisher("test/attachment").res().unwrap(); + for i in 0..10 { + let mut backer = [( + [0; std::mem::size_of::()], + [0; std::mem::size_of::()], + ); 10]; + for (j, backer) in backer.iter_mut().enumerate() { + *backer = ((i * 10 + j).to_le_bytes(), (i * 10 + j).to_be_bytes()) + } + zenoh + .put("test/attachment", "put") + .with_attachment( + backer + .iter() + .map(|b| (b.0.as_slice(), b.1.as_slice())) + .collect(), + ) + .res() + .unwrap(); + publisher + .put("publisher") + .with_attachment( + backer + .iter() + .map(|b| (b.0.as_slice(), b.1.as_slice())) + .collect(), + ) + .res() + .unwrap(); + } +} +#[cfg(feature = "unstable")] +#[test] +fn queries() { + use zenoh::{prelude::sync::*, sample::Attachment}; + + let zenoh = zenoh::open(Config::default()).res().unwrap(); + let _sub = zenoh + .declare_queryable("test/attachment") + .callback(|query| { + println!( + "{}", + std::str::from_utf8( + &query + .value() + .map(|q| q.payload.contiguous()) + .unwrap_or_default() + ) + .unwrap() + ); + let mut attachment = Attachment::new(); + for (k, v) in query.attachment().unwrap() { + assert!(k.iter().rev().zip(v.as_slice()).all(|(k, v)| k == v)); + attachment.insert(&k, &k); + } + query + .reply(Ok(Sample::new( + query.key_expr().clone(), + query.value().unwrap().clone(), + ) + .with_attachment(attachment))) + .res() + .unwrap(); + }) + .res() + .unwrap(); + for i in 0..10 { + let mut backer = [( + [0; std::mem::size_of::()], + [0; std::mem::size_of::()], + ); 10]; + for (j, backer) in backer.iter_mut().enumerate() { + *backer = ((i * 10 + j).to_le_bytes(), (i * 10 + j).to_be_bytes()) + } + let get = zenoh + .get("test/attachment") + .with_value("query") + .with_attachment( + backer + .iter() + .map(|b| (b.0.as_slice(), b.1.as_slice())) + .collect(), + ) + .res() + .unwrap(); + while let Ok(reply) = get.recv() { + let response = reply.sample.as_ref().unwrap(); + for (k, v) in response.attachment().unwrap() { + assert_eq!(k, v) + } + } + } +}