Skip to content

Commit

Permalink
Initial work for Batch codec [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Oct 13, 2023
1 parent 56835b2 commit ad44fd9
Show file tree
Hide file tree
Showing 9 changed files with 1,049 additions and 62 deletions.
41 changes: 30 additions & 11 deletions commons/zenoh-buffers/src/bbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::{
buffer::{Buffer, SplitBuffer},
reader::HasReader,
vec,
writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer},
ZSlice,
};
use alloc::{boxed::Box, sync::Arc};
use core::{fmt, num::NonZeroUsize};
use core::{fmt, num::NonZeroUsize, option};

#[derive(Clone, PartialEq, Eq)]
pub struct BBuf {
Expand All @@ -40,16 +41,6 @@ impl BBuf {
self.buffer.len()
}

#[must_use]
pub const fn len(&self) -> usize {
self.len
}

#[must_use]
pub const fn is_empty(&self) -> bool {
self.len == 0
}

#[must_use]
pub fn as_slice(&self) -> &[u8] {
// SAFETY: self.len is ensured by the writer to be smaller than buffer length.
Expand Down Expand Up @@ -77,6 +68,34 @@ impl fmt::Debug for BBuf {
}
}

// Buffer
impl Buffer for BBuf {
fn len(&self) -> usize {
self.len
}
}

impl Buffer for &BBuf {
fn len(&self) -> usize {
self.len
}
}

impl Buffer for &mut BBuf {
fn len(&self) -> usize {
self.len
}
}

// SplitBuffer
impl SplitBuffer for BBuf {
type Slices<'a> = option::IntoIter<&'a [u8]>;

fn slices(&self) -> Self::Slices<'_> {
Some(self.as_slice()).into_iter()
}
}

// Writer
impl HasWriter for &mut BBuf {
type Writer = Self;
Expand Down
73 changes: 39 additions & 34 deletions commons/zenoh-buffers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub mod vec;
mod zbuf;
mod zslice;

use alloc::{borrow::Cow, vec::Vec};
pub use bbuf::*;
pub use zbuf::*;
pub use zslice::*;
Expand Down Expand Up @@ -73,6 +72,45 @@ macro_rules! unsafe_slice_mut {
};
}

pub mod buffer {
use alloc::{borrow::Cow, vec::Vec};

pub trait Buffer {
/// Returns the number of bytes in the buffer.
fn len(&self) -> usize;

/// Returns `true` if the buffer has a length of 0.
fn is_empty(&self) -> bool {
self.len() == 0
}
}

/// A trait for buffers that can be composed of multiple non contiguous slices.
pub trait SplitBuffer: Buffer {
type Slices<'a>: Iterator<Item = &'a [u8]> + ExactSizeIterator
where
Self: 'a;

/// Gets all the slices of this buffer.
fn slices(&self) -> Self::Slices<'_>;

/// Returns all the bytes of this buffer in a conitguous slice.
/// This may require allocation and copy if the original buffer
/// is not contiguous.
fn contiguous(&self) -> Cow<'_, [u8]> {
let mut slices = self.slices();
match slices.len() {
0 => Cow::Borrowed(b""),
1 => Cow::Borrowed(slices.next().unwrap()),
_ => Cow::Owned(slices.fold(Vec::new(), |mut acc, it| {
acc.extend(it);
acc
})),
}
}
}
}

pub mod writer {
use crate::ZSlice;
use core::num::NonZeroUsize;
Expand Down Expand Up @@ -176,36 +214,3 @@ pub mod reader {
fn reader(self) -> Self::Reader;
}
}

/// A trait for buffers that can be composed of multiple non contiguous slices.
pub trait SplitBuffer<'a> {
type Slices: Iterator<Item = &'a [u8]> + ExactSizeIterator;

/// Gets all the slices of this buffer.
fn slices(&'a self) -> Self::Slices;

/// Returns `true` if the buffer has a length of 0.
fn is_empty(&'a self) -> bool {
self.slices().all(<[u8]>::is_empty)
}

/// Returns the number of bytes in the buffer.
fn len(&'a self) -> usize {
self.slices().fold(0, |acc, it| acc + it.len())
}

/// Returns all the bytes of this buffer in a conitguous slice.
/// This may require allocation and copy if the original buffer
/// is not contiguous.
fn contiguous(&'a self) -> Cow<'a, [u8]> {
let mut slices = self.slices();
match slices.len() {
0 => Cow::Borrowed(b""),
1 => Cow::Borrowed(slices.next().unwrap()),
_ => Cow::Owned(slices.fold(Vec::new(), |mut acc, it| {
acc.extend(it);
acc
})),
}
}
}
33 changes: 32 additions & 1 deletion commons/zenoh-buffers/src/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,42 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::{
buffer::{Buffer, SplitBuffer},
reader::{BacktrackableReader, DidntRead, DidntSiphon, HasReader, Reader, SiphonableReader},
writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer},
ZSlice,
};
use core::{marker::PhantomData, mem, num::NonZeroUsize, slice};
use core::{
marker::PhantomData,
mem,
num::NonZeroUsize,
option,
slice::{self},
};

// Buffer
impl Buffer for &[u8] {
#[inline(always)]
fn len(&self) -> usize {
<[u8]>::len(self)
}
}

impl Buffer for &mut [u8] {
#[inline(always)]
fn len(&self) -> usize {
<[u8]>::len(self)
}
}

// SplitBuffer
impl<'b> SplitBuffer for &'b [u8] {
type Slices<'a> = option::IntoIter<&'a [u8]> where 'b: 'a;

fn slices(&self) -> Self::Slices<'_> {
Some(*self).into_iter()
}
}

// Writer
impl HasWriter for &mut [u8] {
Expand Down
31 changes: 30 additions & 1 deletion commons/zenoh-buffers/src/vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::{
buffer::{Buffer, SplitBuffer},
reader::HasReader,
writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer},
};
use alloc::vec::Vec;
use core::{mem, num::NonZeroUsize};
use core::{mem, num::NonZeroUsize, option};

/// Allocate a vector with a given capacity and sets the length to that capacity.
#[must_use]
Expand All @@ -30,6 +31,34 @@ pub fn uninit(capacity: usize) -> Vec<u8> {
vbuf
}

// Buffer
impl Buffer for Vec<u8> {
fn len(&self) -> usize {
Vec::len(self)
}
}

impl Buffer for &Vec<u8> {
fn len(&self) -> usize {
Vec::len(self)
}
}

impl Buffer for &mut Vec<u8> {
fn len(&self) -> usize {
Vec::len(self)
}
}

// SplitBuffer
impl SplitBuffer for Vec<u8> {
type Slices<'a> = option::IntoIter<&'a [u8]>;

fn slices(&self) -> Self::Slices<'_> {
Some(self.as_slice()).into_iter()
}
}

// Writer
impl<'a> HasWriter for &'a mut Vec<u8> {
type Writer = Self;
Expand Down
26 changes: 13 additions & 13 deletions commons/zenoh-buffers/src/zbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
#[cfg(feature = "shared-memory")]
use crate::ZSliceKind;
use crate::{
buffer::{Buffer, SplitBuffer},
reader::{BacktrackableReader, DidntRead, DidntSiphon, HasReader, Reader, SiphonableReader},
writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer},
SplitBuffer, ZSlice,
ZSlice,
};
use alloc::{sync::Arc, vec::Vec};
use core::{cmp, iter, mem, num::NonZeroUsize, ptr, slice};
Expand Down Expand Up @@ -56,18 +57,8 @@ impl ZBuf {
}
}

impl<'a> SplitBuffer<'a> for ZBuf {
type Slices = iter::Map<slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;

fn slices(&'a self) -> Self::Slices {
self.slices.as_ref().iter().map(ZSlice::as_slice)
}

#[inline(always)]
fn is_empty(&self) -> bool {
self.len() == 0
}

// Buffer
impl Buffer for ZBuf {
#[inline(always)]
fn len(&self) -> usize {
self.slices
Expand All @@ -77,6 +68,15 @@ impl<'a> SplitBuffer<'a> for ZBuf {
}
}

// SplitBuffer
impl SplitBuffer for ZBuf {
type Slices<'a> = iter::Map<slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;

fn slices(&self) -> Self::Slices<'_> {
self.slices.as_ref().iter().map(ZSlice::as_slice)
}
}

impl PartialEq for ZBuf {
fn eq(&self, other: &Self) -> bool {
let mut self_slices = self.slices();
Expand Down
34 changes: 33 additions & 1 deletion commons/zenoh-buffers/src/zslice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::reader::{BacktrackableReader, DidntRead, HasReader, Reader};
use crate::{
buffer::{Buffer, SplitBuffer},
reader::{BacktrackableReader, DidntRead, HasReader, Reader},
};
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use core::{
any::Any,
convert::AsRef,
fmt,
num::NonZeroUsize,
ops::{Deref, Index, Range, RangeFrom, RangeFull, RangeInclusive, RangeTo, RangeToInclusive},
option,
};

/*************************************/
Expand Down Expand Up @@ -272,6 +276,34 @@ where
}
}

// Buffer
impl Buffer for ZSlice {
fn len(&self) -> usize {
ZSlice::len(self)
}
}

impl Buffer for &ZSlice {
fn len(&self) -> usize {
ZSlice::len(self)
}
}

impl Buffer for &mut ZSlice {
fn len(&self) -> usize {
ZSlice::len(self)
}
}

// SplitBuffer
impl SplitBuffer for ZSlice {
type Slices<'a> = option::IntoIter<&'a [u8]>;

fn slices(&self) -> Self::Slices<'_> {
Some(self.as_slice()).into_iter()
}
}

// Reader
impl HasReader for &mut ZSlice {
type Reader = Self;
Expand Down
3 changes: 2 additions & 1 deletion commons/zenoh-codec/src/core/zbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
//
use crate::{LCodec, RCodec, WCodec, Zenoh080, Zenoh080Bounded};
use zenoh_buffers::{
buffer::Buffer,
reader::{DidntRead, Reader},
writer::{DidntWrite, Writer},
SplitBuffer, ZBuf,
ZBuf,
};

// ZBuf bounded
Expand Down
Loading

0 comments on commit ad44fd9

Please sign in to comment.