Skip to content

Commit

Permalink
Add protocol extensions for user attachment (#590)
Browse files Browse the repository at this point in the history
Co-authored-by: Pierre Avital <[email protected]>
Co-authored-by: Pierre Avital <[email protected]>
  • Loading branch information
3 people authored Dec 14, 2023
1 parent 68aadaf commit 49011f1
Show file tree
Hide file tree
Showing 70 changed files with 1,504 additions and 368 deletions.
108 changes: 99 additions & 9 deletions commons/zenoh-buffers/src/zbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
ZSlice,
};
use alloc::{sync::Arc, vec::Vec};
use core::{cmp, iter, mem, num::NonZeroUsize, ptr, slice};
use core::{cmp, iter, mem, num::NonZeroUsize, ops::RangeBounds, ptr};
use zenoh_collections::SingleOrVec;

fn get_mut_unchecked<T>(arc: &mut Arc<T>) -> &mut T {
Expand Down Expand Up @@ -55,6 +55,85 @@ impl ZBuf {
self.slices.push(zslice);
}
}

pub fn splice<Range: RangeBounds<usize>>(&mut self, erased: Range, replacement: &[u8]) {
let start = match erased.start_bound() {
core::ops::Bound::Included(n) => *n,
core::ops::Bound::Excluded(n) => n + 1,
core::ops::Bound::Unbounded => 0,
};
let end = match erased.end_bound() {
core::ops::Bound::Included(n) => n + 1,
core::ops::Bound::Excluded(n) => *n,
core::ops::Bound::Unbounded => self.len(),
};
if start != end {
self.remove(start, end);
}
self.insert(start, replacement);
}
fn remove(&mut self, mut start: usize, mut end: usize) {
assert!(start <= end);
assert!(end <= self.len());
let mut start_slice_idx = 0;
let mut start_idx_in_start_slice = 0;
let mut end_slice_idx = 0;
let mut end_idx_in_end_slice = 0;
for (i, slice) in self.slices.as_mut().iter_mut().enumerate() {
if slice.len() > start {
start_slice_idx = i;
start_idx_in_start_slice = start;
}
if slice.len() >= end {
end_slice_idx = i;
end_idx_in_end_slice = end;
break;
}
start -= slice.len();
end -= slice.len();
}
let start_slice = &mut self.slices.as_mut()[start_slice_idx];
start_slice.end = start_slice.start + start_idx_in_start_slice;
let drain_start = start_slice_idx + (start_slice.start < start_slice.end) as usize;
let end_slice = &mut self.slices.as_mut()[end_slice_idx];
end_slice.start += end_idx_in_end_slice;
let drain_end = end_slice_idx + (end_slice.start >= end_slice.end) as usize;
self.slices.drain(drain_start..drain_end);
}
fn insert(&mut self, mut at: usize, slice: &[u8]) {
if slice.is_empty() {
return;
}
let old_at = at;
let mut slice_index = usize::MAX;
for (i, slice) in self.slices.as_ref().iter().enumerate() {
if at < slice.len() {
slice_index = i;
break;
}
if let Some(new_at) = at.checked_sub(slice.len()) {
at = new_at
} else {
panic!(
"Out of bounds insert attempted: at={old_at}, len={}",
self.len()
)
}
}
if at != 0 {
let split = &self.slices.as_ref()[slice_index];
let (l, r) = (
split.subslice(0, at).unwrap(),
split.subslice(at, split.len()).unwrap(),
);
self.slices.drain(slice_index..(slice_index + 1));
self.slices.insert(slice_index, l);
self.slices.insert(slice_index + 1, Vec::from(slice).into());
self.slices.insert(slice_index + 2, r);
} else {
self.slices.insert(slice_index, Vec::from(slice).into())
}
}
}

// Buffer
Expand All @@ -70,7 +149,7 @@ impl Buffer for ZBuf {

// SplitBuffer
impl SplitBuffer for ZBuf {
type Slices<'a> = iter::Map<slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;
type Slices<'a> = iter::Map<core::slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;

fn slices(&self) -> Self::Slices<'_> {
self.slices.as_ref().iter().map(ZSlice::as_slice)
Expand All @@ -89,7 +168,7 @@ impl PartialEq for ZBuf {
(None, _) | (_, None) => return false,
(Some(l), Some(r)) => {
let cmp_len = l.len().min(r.len());
// SAFETY: cmp_len is the minimum lenght between l and r slices.
// SAFETY: cmp_len is the minimum length between l and r slices.
let lhs = crate::unsafe_slice!(l, ..cmp_len);
let rhs = crate::unsafe_slice!(r, ..cmp_len);
if lhs != rhs {
Expand All @@ -98,14 +177,14 @@ impl PartialEq for ZBuf {
if cmp_len == l.len() {
current_self = self_slices.next();
} else {
// SAFETY: cmp_len is the minimum lenght between l and r slices.
// SAFETY: cmp_len is the minimum length between l and r slices.
let lhs = crate::unsafe_slice!(l, cmp_len..);
current_self = Some(lhs);
}
if cmp_len == r.len() {
current_other = other_slices.next();
} else {
// SAFETY: cmp_len is the minimum lenght between l and r slices.
// SAFETY: cmp_len is the minimum length between l and r slices.
let rhs = crate::unsafe_slice!(r, cmp_len..);
current_other = Some(rhs);
}
Expand Down Expand Up @@ -161,12 +240,12 @@ impl<'a> Reader for ZBufReader<'a> {
// Take the minimum length among read and write slices
let len = from.len().min(into.len());
// Copy the slice content
// SAFETY: len is the minimum lenght between from and into slices.
// SAFETY: len is the minimum length between from and into slices.
let lhs = crate::unsafe_slice_mut!(into, ..len);
let rhs = crate::unsafe_slice!(from, ..len);
lhs.copy_from_slice(rhs);
// Advance the write slice
// SAFETY: len is the minimum lenght between from and into slices.
// SAFETY: len is the minimum length between from and into slices.
into = crate::unsafe_slice_mut!(into, len..);
// Update the counter
read += len;
Expand Down Expand Up @@ -380,9 +459,20 @@ impl<'a> HasWriter for &'a mut ZBuf {
type Writer = ZBufWriter<'a>;

fn writer(self) -> Self::Writer {
let mut cache = None;
if let Some(ZSlice { buf, end, .. }) = self.slices.last_mut() {
// Verify the ZSlice is actually a Vec<u8>
if let Some(b) = buf.as_any().downcast_ref::<Vec<u8>>() {
// Check for the length
if *end == b.len() {
cache = Some(unsafe { Arc::from_raw(Arc::into_raw(buf.clone()).cast()) })
}
}
}

ZBufWriter {
inner: self,
cache: Arc::new(Vec::new()),
cache: cache.unwrap_or_else(|| Arc::new(Vec::new())),
}
}
}
Expand Down Expand Up @@ -433,7 +523,7 @@ impl Writer for ZBufWriter<'_> {
}

fn write_u8(&mut self, byte: u8) -> Result<(), DidntWrite> {
self.write_exact(slice::from_ref(&byte))
self.write_exact(core::slice::from_ref(&byte))
}

fn remaining(&self) -> usize {
Expand Down
6 changes: 6 additions & 0 deletions commons/zenoh-codec/benches/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload: ZBuf::from(vec![0u8; 8]),
}),
Expand Down Expand Up @@ -136,6 +137,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload: ZBuf::from(vec![0u8; 8]),
}),
Expand Down Expand Up @@ -176,6 +178,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload: ZBuf::from(vec![0u8; 8]),
}),
Expand Down Expand Up @@ -216,6 +219,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload: ZBuf::from(vec![0u8; 1_000_000]),
}),
Expand Down Expand Up @@ -243,6 +247,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload: ZBuf::from(vec![0u8; 1_000_000]),
}),
Expand Down Expand Up @@ -281,6 +286,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload: ZBuf::from(vec![0u8; 1_000_000]),
}),
Expand Down
22 changes: 16 additions & 6 deletions commons/zenoh-codec/src/common/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: (&ZExtUnit<{ ID }>, bool)) -> Self::Output {
let (_x, more) = x;
let (x, more) = x;
let ZExtUnit = x;

let mut header: u8 = ID;
if more {
header |= iext::FLAG_Z;
Expand Down Expand Up @@ -134,12 +136,14 @@ where

fn write(self, writer: &mut W, x: (&ZExtZ64<{ ID }>, bool)) -> Self::Output {
let (x, more) = x;
let ZExtZ64 { value } = x;

let mut header: u8 = ID;
if more {
header |= iext::FLAG_Z;
}
self.write(&mut *writer, header)?;
self.write(&mut *writer, x.value)?;
self.write(&mut *writer, value)?;
Ok(())
}
}
Expand Down Expand Up @@ -182,13 +186,15 @@ where

fn write(self, writer: &mut W, x: (&ZExtZBuf<{ ID }>, bool)) -> Self::Output {
let (x, more) = x;
let ZExtZBuf { value } = x;

let mut header: u8 = ID;
if more {
header |= iext::FLAG_Z;
}
self.write(&mut *writer, header)?;
let bodec = Zenoh080Bounded::<u32>::new();
bodec.write(&mut *writer, &x.value)?;
bodec.write(&mut *writer, value)?;
Ok(())
}
}
Expand Down Expand Up @@ -231,13 +237,15 @@ where

fn write(self, writer: &mut W, x: (&ZExtZBufHeader<{ ID }>, bool)) -> Self::Output {
let (x, more) = x;
let ZExtZBufHeader { len } = x;

let mut header: u8 = ID;
if more {
header |= iext::FLAG_Z;
}
self.write(&mut *writer, header)?;
let bodec = Zenoh080Bounded::<u32>::new();
bodec.write(&mut *writer, x.len)?;
bodec.write(&mut *writer, *len)?;
Ok(())
}
}
Expand Down Expand Up @@ -284,11 +292,13 @@ where

fn write(self, writer: &mut W, x: (&ZExtUnknown, bool)) -> Self::Output {
let (x, more) = x;
let mut header: u8 = x.id;
let ZExtUnknown { id, body } = x;

let mut header: u8 = *id;
if more {
header |= iext::FLAG_Z;
}
match &x.body {
match body {
ZExtBody::Unit => self.write(&mut *writer, header)?,
ZExtBody::Z64(u64) => {
self.write(&mut *writer, header)?;
Expand Down
6 changes: 4 additions & 2 deletions commons/zenoh-codec/src/core/property.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &Property) -> Self::Output {
self.write(&mut *writer, x.key)?;
self.write(&mut *writer, x.value.as_slice())?;
let Property { key, value } = x;

self.write(&mut *writer, key)?;
self.write(&mut *writer, value.as_slice())?;
Ok(())
}
}
Expand Down
15 changes: 11 additions & 4 deletions commons/zenoh-codec/src/core/shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,17 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &SharedMemoryBufInfo) -> Self::Output {
self.write(&mut *writer, x.offset)?;
self.write(&mut *writer, x.length)?;
self.write(&mut *writer, x.shm_manager.as_str())?;
self.write(&mut *writer, x.kind)?;
let SharedMemoryBufInfo {
offset,
length,
shm_manager,
kind,
} = x;

self.write(&mut *writer, offset)?;
self.write(&mut *writer, length)?;
self.write(&mut *writer, shm_manager.as_str())?;
self.write(&mut *writer, kind)?;
Ok(())
}
}
Expand Down
10 changes: 8 additions & 2 deletions commons/zenoh-codec/src/core/wire_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,18 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &WireExpr<'_>) -> Self::Output {
let WireExpr {
scope,
suffix,
mapping: _,
} = x;

let zodec = Zenoh080Bounded::<ExprId>::new();
zodec.write(&mut *writer, x.scope)?;
zodec.write(&mut *writer, *scope)?;

if x.has_suffix() {
let zodec = Zenoh080Bounded::<ExprLen>::new();
zodec.write(&mut *writer, x.suffix.as_ref())?;
zodec.write(&mut *writer, suffix.as_ref())?;
}
Ok(())
}
Expand Down
Loading

0 comments on commit 49011f1

Please sign in to comment.