Skip to content

Commit

Permalink
Hop-to-hop compression (#585)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets authored Nov 30, 2023
1 parent 7fa7d6c commit 1dc31d4
Show file tree
Hide file tree
Showing 86 changed files with 3,641 additions and 1,783 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 16 additions & 11 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,20 @@
/// NOTE: Due to the note above, 'lowlatency' is incompatible with 'qos' option, so in order to
/// enable 'lowlatency' you need to explicitly disable 'qos'.
lowlatency: false,
qos: {
enabled: true,
},
compression: {
enabled: false,
},
},
qos: {
enabled: true,
multicast: {
qos: {
enabled: true,
},
compression: {
enabled: false,
},
},
link: {
/// An optional whitelist of protocols to be used for accepting and opening sessions.
Expand Down Expand Up @@ -183,6 +194,9 @@
/// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress.
/// Higher values lead to a more aggressive batching but it will introduce additional latency.
backoff: 100,
// Number of threads dedicated to transmission
// By default, the number of threads is calculated as follows: 1 + ((#cores - 1) / 4)
// threads: 4,
},
},
/// Configure the zenoh RX parameters of a link
Expand Down Expand Up @@ -220,15 +234,6 @@
// ca to verify that the server at baz.com is actually baz.com, let this be true (default).
server_name_verification: null,
},

/// **Experimental** compression feature.
/// Will compress the batches hop to hop (as opposed to end to end).
/// The features "transport_compression" and "unstable" need to be enabled to handle
/// compression on the integrality of the network.
compression: {
/// When 'enabled' is true, batches will be sent compressed.
enabled: false,
},
},
/// Shared memory configuration
shared_memory: {
Expand Down
65 changes: 52 additions & 13 deletions commons/zenoh-buffers/src/bbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::{
buffer::{Buffer, SplitBuffer},
reader::HasReader,
vec,
writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer},
ZSlice,
};
use alloc::boxed::Box;
use core::num::NonZeroUsize;
use alloc::{boxed::Box, sync::Arc};
use core::{fmt, num::NonZeroUsize, option};

#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq)]
pub struct BBuf {
buffer: Box<[u8]>,
len: usize,
Expand All @@ -39,16 +41,6 @@ impl BBuf {
self.buffer.len()
}

#[must_use]
pub const fn len(&self) -> usize {
self.len
}

#[must_use]
pub const fn is_empty(&self) -> bool {
self.len == 0
}

#[must_use]
pub fn as_slice(&self) -> &[u8] {
// SAFETY: self.len is ensured by the writer to be smaller than buffer length.
Expand All @@ -70,6 +62,40 @@ impl BBuf {
}
}

impl fmt::Debug for BBuf {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:02x?}", self.as_slice())
}
}

// Buffer
impl Buffer for BBuf {
fn len(&self) -> usize {
self.len
}
}

impl Buffer for &BBuf {
fn len(&self) -> usize {
self.len
}
}

impl Buffer for &mut BBuf {
fn len(&self) -> usize {
self.len
}
}

// SplitBuffer
impl SplitBuffer for BBuf {
type Slices<'a> = option::IntoIter<&'a [u8]>;

fn slices(&self) -> Self::Slices<'_> {
Some(self.as_slice()).into_iter()
}
}

// Writer
impl HasWriter for &mut BBuf {
type Writer = Self;
Expand Down Expand Up @@ -152,6 +178,19 @@ impl<'a> HasReader for &'a BBuf {
}
}

// From impls
impl From<BBuf> for ZSlice {
fn from(value: BBuf) -> Self {
ZSlice {
buf: Arc::new(value.buffer),
start: 0,
end: value.len,
#[cfg(feature = "shared-memory")]
kind: crate::ZSliceKind::Raw,
}
}
}

#[cfg(feature = "test")]
impl BBuf {
pub fn rand(len: usize) -> Self {
Expand Down
74 changes: 40 additions & 34 deletions commons/zenoh-buffers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub mod vec;
mod zbuf;
mod zslice;

use alloc::{borrow::Cow, vec::Vec};
pub use bbuf::*;
pub use zbuf::*;
pub use zslice::*;
Expand Down Expand Up @@ -73,6 +72,45 @@ macro_rules! unsafe_slice_mut {
};
}

pub mod buffer {
use alloc::{borrow::Cow, vec::Vec};

pub trait Buffer {
/// Returns the number of bytes in the buffer.
fn len(&self) -> usize;

/// Returns `true` if the buffer has a length of 0.
fn is_empty(&self) -> bool {
self.len() == 0
}
}

/// A trait for buffers that can be composed of multiple non contiguous slices.
pub trait SplitBuffer: Buffer {
type Slices<'a>: Iterator<Item = &'a [u8]> + ExactSizeIterator
where
Self: 'a;

/// Gets all the slices of this buffer.
fn slices(&self) -> Self::Slices<'_>;

/// Returns all the bytes of this buffer in a conitguous slice.
/// This may require allocation and copy if the original buffer
/// is not contiguous.
fn contiguous(&self) -> Cow<'_, [u8]> {
let mut slices = self.slices();
match slices.len() {
0 => Cow::Borrowed(b""),
1 => Cow::Borrowed(slices.next().unwrap()),
_ => Cow::Owned(slices.fold(Vec::new(), |mut acc, it| {
acc.extend(it);
acc
})),
}
}
}
}

pub mod writer {
use crate::ZSlice;
use core::num::NonZeroUsize;
Expand Down Expand Up @@ -100,6 +138,7 @@ pub mod writer {
where
F: FnOnce(&mut [u8]) -> usize;
}

pub trait BacktrackableWriter: Writer {
type Mark;

Expand Down Expand Up @@ -175,36 +214,3 @@ pub mod reader {
fn reader(self) -> Self::Reader;
}
}

/// A trait for buffers that can be composed of multiple non contiguous slices.
pub trait SplitBuffer<'a> {
type Slices: Iterator<Item = &'a [u8]> + ExactSizeIterator;

/// Gets all the slices of this buffer.
fn slices(&'a self) -> Self::Slices;

/// Returns `true` if the buffer has a length of 0.
fn is_empty(&'a self) -> bool {
self.slices().all(<[u8]>::is_empty)
}

/// Returns the number of bytes in the buffer.
fn len(&'a self) -> usize {
self.slices().fold(0, |acc, it| acc + it.len())
}

/// Returns all the bytes of this buffer in a conitguous slice.
/// This may require allocation and copy if the original buffer
/// is not contiguous.
fn contiguous(&'a self) -> Cow<'a, [u8]> {
let mut slices = self.slices();
match slices.len() {
0 => Cow::Borrowed(b""),
1 => Cow::Borrowed(slices.next().unwrap()),
_ => Cow::Owned(slices.fold(Vec::new(), |mut acc, it| {
acc.extend(it);
acc
})),
}
}
}
33 changes: 32 additions & 1 deletion commons/zenoh-buffers/src/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,42 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::{
buffer::{Buffer, SplitBuffer},
reader::{BacktrackableReader, DidntRead, DidntSiphon, HasReader, Reader, SiphonableReader},
writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer},
ZSlice,
};
use core::{marker::PhantomData, mem, num::NonZeroUsize, slice};
use core::{
marker::PhantomData,
mem,
num::NonZeroUsize,
option,
slice::{self},
};

// Buffer
impl Buffer for &[u8] {
#[inline(always)]
fn len(&self) -> usize {
<[u8]>::len(self)
}
}

impl Buffer for &mut [u8] {
#[inline(always)]
fn len(&self) -> usize {
<[u8]>::len(self)
}
}

// SplitBuffer
impl<'b> SplitBuffer for &'b [u8] {
type Slices<'a> = option::IntoIter<&'a [u8]> where 'b: 'a;

fn slices(&self) -> Self::Slices<'_> {
Some(*self).into_iter()
}
}

// Writer
impl HasWriter for &mut [u8] {
Expand Down
31 changes: 30 additions & 1 deletion commons/zenoh-buffers/src/vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::{
buffer::{Buffer, SplitBuffer},
reader::HasReader,
writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer},
};
use alloc::vec::Vec;
use core::{mem, num::NonZeroUsize};
use core::{mem, num::NonZeroUsize, option};

/// Allocate a vector with a given capacity and sets the length to that capacity.
#[must_use]
Expand All @@ -30,6 +31,34 @@ pub fn uninit(capacity: usize) -> Vec<u8> {
vbuf
}

// Buffer
impl Buffer for Vec<u8> {
fn len(&self) -> usize {
Vec::len(self)
}
}

impl Buffer for &Vec<u8> {
fn len(&self) -> usize {
Vec::len(self)
}
}

impl Buffer for &mut Vec<u8> {
fn len(&self) -> usize {
Vec::len(self)
}
}

// SplitBuffer
impl SplitBuffer for Vec<u8> {
type Slices<'a> = option::IntoIter<&'a [u8]>;

fn slices(&self) -> Self::Slices<'_> {
Some(self.as_slice()).into_iter()
}
}

// Writer
impl<'a> HasWriter for &'a mut Vec<u8> {
type Writer = Self;
Expand Down
26 changes: 13 additions & 13 deletions commons/zenoh-buffers/src/zbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
#[cfg(feature = "shared-memory")]
use crate::ZSliceKind;
use crate::{
buffer::{Buffer, SplitBuffer},
reader::{BacktrackableReader, DidntRead, DidntSiphon, HasReader, Reader, SiphonableReader},
writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer},
SplitBuffer, ZSlice,
ZSlice,
};
use alloc::{sync::Arc, vec::Vec};
use core::{cmp, iter, mem, num::NonZeroUsize, ptr, slice};
Expand Down Expand Up @@ -56,18 +57,8 @@ impl ZBuf {
}
}

impl<'a> SplitBuffer<'a> for ZBuf {
type Slices = iter::Map<slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;

fn slices(&'a self) -> Self::Slices {
self.slices.as_ref().iter().map(ZSlice::as_slice)
}

#[inline(always)]
fn is_empty(&self) -> bool {
self.len() == 0
}

// Buffer
impl Buffer for ZBuf {
#[inline(always)]
fn len(&self) -> usize {
self.slices
Expand All @@ -77,6 +68,15 @@ impl<'a> SplitBuffer<'a> for ZBuf {
}
}

// SplitBuffer
impl SplitBuffer for ZBuf {
type Slices<'a> = iter::Map<slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;

fn slices(&self) -> Self::Slices<'_> {
self.slices.as_ref().iter().map(ZSlice::as_slice)
}
}

impl PartialEq for ZBuf {
fn eq(&self, other: &Self) -> bool {
let mut self_slices = self.slices();
Expand Down
Loading

0 comments on commit 1dc31d4

Please sign in to comment.