Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add protocol extensions for user attachment #590

Merged
merged 23 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 99 additions & 9 deletions commons/zenoh-buffers/src/zbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
ZSlice,
};
use alloc::{sync::Arc, vec::Vec};
use core::{cmp, iter, mem, num::NonZeroUsize, ptr, slice};
use core::{cmp, iter, mem, num::NonZeroUsize, ops::RangeBounds, ptr};
use zenoh_collections::SingleOrVec;

fn get_mut_unchecked<T>(arc: &mut Arc<T>) -> &mut T {
Expand Down Expand Up @@ -55,6 +55,85 @@ impl ZBuf {
self.slices.push(zslice);
}
}

pub fn splice<Range: RangeBounds<usize>>(&mut self, erased: Range, replacement: &[u8]) {
let start = match erased.start_bound() {
core::ops::Bound::Included(n) => *n,
core::ops::Bound::Excluded(n) => n + 1,
core::ops::Bound::Unbounded => 0,
};
let end = match erased.end_bound() {
core::ops::Bound::Included(n) => n + 1,
core::ops::Bound::Excluded(n) => *n,
core::ops::Bound::Unbounded => self.len(),
};
if start != end {
self.remove(start, end);
}
self.insert(start, replacement);
}
fn remove(&mut self, mut start: usize, mut end: usize) {
assert!(start <= end);
assert!(end <= self.len());
let mut start_slice_idx = 0;
let mut start_idx_in_start_slice = 0;
let mut end_slice_idx = 0;
let mut end_idx_in_end_slice = 0;
for (i, slice) in self.slices.as_mut().iter_mut().enumerate() {
if slice.len() > start {
start_slice_idx = i;
start_idx_in_start_slice = start;
}
if slice.len() >= end {
end_slice_idx = i;
end_idx_in_end_slice = end;
break;
}
start -= slice.len();
end -= slice.len();
}
let start_slice = &mut self.slices.as_mut()[start_slice_idx];
start_slice.end = start_slice.start + start_idx_in_start_slice;
let drain_start = start_slice_idx + (start_slice.start < start_slice.end) as usize;
let end_slice = &mut self.slices.as_mut()[end_slice_idx];
end_slice.start += end_idx_in_end_slice;
let drain_end = end_slice_idx + (end_slice.start >= end_slice.end) as usize;
self.slices.drain(drain_start..drain_end);
}
fn insert(&mut self, mut at: usize, slice: &[u8]) {
if slice.is_empty() {
return;
}
let old_at = at;
let mut slice_index = usize::MAX;
for (i, slice) in self.slices.as_ref().iter().enumerate() {
if at < slice.len() {
slice_index = i;
break;
}
if let Some(new_at) = at.checked_sub(slice.len()) {
at = new_at
} else {
panic!(
"Out of bounds insert attempted: at={old_at}, len={}",
self.len()
)
}
}
if at != 0 {
let split = &self.slices.as_ref()[slice_index];
let (l, r) = (
split.subslice(0, at).unwrap(),
split.subslice(at, split.len()).unwrap(),
);
self.slices.drain(slice_index..(slice_index + 1));
self.slices.insert(slice_index, l);
self.slices.insert(slice_index + 1, Vec::from(slice).into());
self.slices.insert(slice_index + 2, r);
} else {
self.slices.insert(slice_index, Vec::from(slice).into())
}
}
}

// Buffer
Expand All @@ -70,7 +149,7 @@ impl Buffer for ZBuf {

// SplitBuffer
impl SplitBuffer for ZBuf {
type Slices<'a> = iter::Map<slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;
type Slices<'a> = iter::Map<core::slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;

fn slices(&self) -> Self::Slices<'_> {
self.slices.as_ref().iter().map(ZSlice::as_slice)
Expand All @@ -89,7 +168,7 @@ impl PartialEq for ZBuf {
(None, _) | (_, None) => return false,
(Some(l), Some(r)) => {
let cmp_len = l.len().min(r.len());
// SAFETY: cmp_len is the minimum lenght between l and r slices.
// SAFETY: cmp_len is the minimum length between l and r slices.
let lhs = crate::unsafe_slice!(l, ..cmp_len);
let rhs = crate::unsafe_slice!(r, ..cmp_len);
if lhs != rhs {
Expand All @@ -98,14 +177,14 @@ impl PartialEq for ZBuf {
if cmp_len == l.len() {
current_self = self_slices.next();
} else {
// SAFETY: cmp_len is the minimum lenght between l and r slices.
// SAFETY: cmp_len is the minimum length between l and r slices.
let lhs = crate::unsafe_slice!(l, cmp_len..);
current_self = Some(lhs);
}
if cmp_len == r.len() {
current_other = other_slices.next();
} else {
// SAFETY: cmp_len is the minimum lenght between l and r slices.
// SAFETY: cmp_len is the minimum length between l and r slices.
let rhs = crate::unsafe_slice!(r, cmp_len..);
current_other = Some(rhs);
}
Expand Down Expand Up @@ -161,12 +240,12 @@ impl<'a> Reader for ZBufReader<'a> {
// Take the minimum length among read and write slices
let len = from.len().min(into.len());
// Copy the slice content
// SAFETY: len is the minimum lenght between from and into slices.
// SAFETY: len is the minimum length between from and into slices.
let lhs = crate::unsafe_slice_mut!(into, ..len);
let rhs = crate::unsafe_slice!(from, ..len);
lhs.copy_from_slice(rhs);
// Advance the write slice
// SAFETY: len is the minimum lenght between from and into slices.
// SAFETY: len is the minimum length between from and into slices.
into = crate::unsafe_slice_mut!(into, len..);
// Update the counter
read += len;
Expand Down Expand Up @@ -380,9 +459,20 @@ impl<'a> HasWriter for &'a mut ZBuf {
type Writer = ZBufWriter<'a>;

fn writer(self) -> Self::Writer {
let mut cache = None;
if let Some(ZSlice { buf, end, .. }) = self.slices.last_mut() {
// Verify the ZSlice is actually a Vec<u8>
if let Some(b) = buf.as_any().downcast_ref::<Vec<u8>>() {
// Check for the length
if *end == b.len() {
cache = Some(unsafe { Arc::from_raw(Arc::into_raw(buf.clone()).cast()) })
}
}
}

ZBufWriter {
inner: self,
cache: Arc::new(Vec::new()),
cache: cache.unwrap_or_else(|| Arc::new(Vec::new())),
}
}
}
Expand Down Expand Up @@ -433,7 +523,7 @@ impl Writer for ZBufWriter<'_> {
}

fn write_u8(&mut self, byte: u8) -> Result<(), DidntWrite> {
self.write_exact(slice::from_ref(&byte))
self.write_exact(core::slice::from_ref(&byte))
}

fn remaining(&self) -> usize {
Expand Down
6 changes: 6 additions & 0 deletions commons/zenoh-codec/benches/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload: ZBuf::from(vec![0u8; 8]),
}),
Expand Down Expand Up @@ -136,6 +137,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload: ZBuf::from(vec![0u8; 8]),
}),
Expand Down Expand Up @@ -176,6 +178,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload: ZBuf::from(vec![0u8; 8]),
}),
Expand Down Expand Up @@ -216,6 +219,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload: ZBuf::from(vec![0u8; 1_000_000]),
}),
Expand Down Expand Up @@ -243,6 +247,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload: ZBuf::from(vec![0u8; 1_000_000]),
}),
Expand Down Expand Up @@ -281,6 +286,7 @@ fn criterion_benchmark(c: &mut Criterion) {
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: None,
ext_unknown: vec![],
payload: ZBuf::from(vec![0u8; 1_000_000]),
}),
Expand Down
22 changes: 16 additions & 6 deletions commons/zenoh-codec/src/common/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: (&ZExtUnit<{ ID }>, bool)) -> Self::Output {
let (_x, more) = x;
let (x, more) = x;
let ZExtUnit = x;

let mut header: u8 = ID;
if more {
header |= iext::FLAG_Z;
Expand Down Expand Up @@ -134,12 +136,14 @@ where

fn write(self, writer: &mut W, x: (&ZExtZ64<{ ID }>, bool)) -> Self::Output {
let (x, more) = x;
let ZExtZ64 { value } = x;

let mut header: u8 = ID;
if more {
header |= iext::FLAG_Z;
}
self.write(&mut *writer, header)?;
self.write(&mut *writer, x.value)?;
self.write(&mut *writer, value)?;
Ok(())
}
}
Expand Down Expand Up @@ -182,13 +186,15 @@ where

fn write(self, writer: &mut W, x: (&ZExtZBuf<{ ID }>, bool)) -> Self::Output {
let (x, more) = x;
let ZExtZBuf { value } = x;

let mut header: u8 = ID;
if more {
header |= iext::FLAG_Z;
}
self.write(&mut *writer, header)?;
let bodec = Zenoh080Bounded::<u32>::new();
bodec.write(&mut *writer, &x.value)?;
bodec.write(&mut *writer, value)?;
Ok(())
}
}
Expand Down Expand Up @@ -231,13 +237,15 @@ where

fn write(self, writer: &mut W, x: (&ZExtZBufHeader<{ ID }>, bool)) -> Self::Output {
let (x, more) = x;
let ZExtZBufHeader { len } = x;

let mut header: u8 = ID;
if more {
header |= iext::FLAG_Z;
}
self.write(&mut *writer, header)?;
let bodec = Zenoh080Bounded::<u32>::new();
bodec.write(&mut *writer, x.len)?;
bodec.write(&mut *writer, *len)?;
Ok(())
}
}
Expand Down Expand Up @@ -284,11 +292,13 @@ where

fn write(self, writer: &mut W, x: (&ZExtUnknown, bool)) -> Self::Output {
let (x, more) = x;
let mut header: u8 = x.id;
let ZExtUnknown { id, body } = x;

let mut header: u8 = *id;
if more {
header |= iext::FLAG_Z;
}
match &x.body {
match body {
ZExtBody::Unit => self.write(&mut *writer, header)?,
ZExtBody::Z64(u64) => {
self.write(&mut *writer, header)?;
Expand Down
6 changes: 4 additions & 2 deletions commons/zenoh-codec/src/core/property.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &Property) -> Self::Output {
self.write(&mut *writer, x.key)?;
self.write(&mut *writer, x.value.as_slice())?;
let Property { key, value } = x;

self.write(&mut *writer, key)?;
self.write(&mut *writer, value.as_slice())?;
Ok(())
}
}
Expand Down
15 changes: 11 additions & 4 deletions commons/zenoh-codec/src/core/shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,17 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &SharedMemoryBufInfo) -> Self::Output {
self.write(&mut *writer, x.offset)?;
self.write(&mut *writer, x.length)?;
self.write(&mut *writer, x.shm_manager.as_str())?;
self.write(&mut *writer, x.kind)?;
let SharedMemoryBufInfo {
offset,
length,
shm_manager,
kind,
} = x;

self.write(&mut *writer, offset)?;
self.write(&mut *writer, length)?;
self.write(&mut *writer, shm_manager.as_str())?;
self.write(&mut *writer, kind)?;
Ok(())
}
}
Expand Down
10 changes: 8 additions & 2 deletions commons/zenoh-codec/src/core/wire_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,18 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &WireExpr<'_>) -> Self::Output {
let WireExpr {
scope,
suffix,
mapping: _,
} = x;

let zodec = Zenoh080Bounded::<ExprId>::new();
zodec.write(&mut *writer, x.scope)?;
zodec.write(&mut *writer, *scope)?;

if x.has_suffix() {
let zodec = Zenoh080Bounded::<ExprLen>::new();
zodec.write(&mut *writer, x.suffix.as_ref())?;
zodec.write(&mut *writer, suffix.as_ref())?;
}
Ok(())
}
Expand Down
Loading
Loading