Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hop-to-hop compression #585

Merged
merged 36 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9ef73bd
WIP WBatch and RBatch [skip ci]
Mallets Sep 29, 2023
1d2caa8
Batch modularization for compression [skip ci]
Mallets Oct 2, 2023
faa781c
Tests are now repassing
Mallets Oct 3, 2023
6b7f733
Tests are now repassing
Mallets Oct 3, 2023
4d2c770
Merge branch 'compression' of github.com:eclipse-zenoh/zenoh into com…
Mallets Oct 3, 2023
4d7f362
Use WBatch and RBatch in multicast
Mallets Oct 4, 2023
1ce449d
RBatch and WBatch reworking [skip ci]
Mallets Oct 9, 2023
217ddd7
RBatch buffer closure [skip ci]
Mallets Oct 9, 2023
56835b2
Fix naming
Mallets Oct 12, 2023
ad44fd9
Initial work for Batch codec [skip ci]
Mallets Oct 13, 2023
4f691bd
Batch encoding/decoding is part of zenoh-codec
Mallets Oct 27, 2023
2c6cb6d
First working example
Mallets Nov 6, 2023
d0dbd94
Mutable send for unicast link
Mallets Nov 7, 2023
3d5ab84
Mutable recv for unicast link
Mallets Nov 7, 2023
05117fe
Use support buffer for compression Tx
Mallets Nov 7, 2023
985f86b
Fix link negotiation
Mallets Nov 7, 2023
ea59d02
Improve logs
Mallets Nov 8, 2023
6cdad63
Batch::new accepts Into<ZSlice>
Mallets Nov 8, 2023
4802bf2
Add compression support to multicast
Mallets Nov 8, 2023
147b727
Add transport unicast compression tests
Mallets Nov 8, 2023
0e2a751
Merge with master
Mallets Nov 8, 2023
4eda8e2
Fix shared-memory clippy
Mallets Nov 8, 2023
1fa8ac4
Fix config deserialization test
Mallets Nov 8, 2023
2731bcf
Fix link Outbound direction misconfiguration
Mallets Nov 16, 2023
68d8885
Fix rw_batch test
Mallets Nov 16, 2023
e808546
Fix doc tests
Mallets Nov 20, 2023
c0a19d1
Fix TLS tests
Mallets Nov 20, 2023
4daefb9
Disable scouting in zenoh_liveness tests
Mallets Nov 22, 2023
2c77ead
Remove unnecessary batch_size parameters
Mallets Nov 23, 2023
50a29ed
Move close link into Link implementation
Mallets Nov 23, 2023
c650a84
Fix unicast compression tests
Mallets Nov 23, 2023
9acebcb
Align TransportLinkMulticast to TransportLinkUnicast
Mallets Nov 24, 2023
19e97e9
Merge with master
Mallets Nov 27, 2023
16513df
Change RBatch visibility to public
Mallets Nov 27, 2023
fb7e187
Merge branch 'master' into compression
Mallets Nov 29, 2023
b5f69cb
Fix clippy warnings
Mallets Nov 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 16 additions & 11 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,20 @@
/// NOTE: Due to the note above, 'lowlatency' is incompatible with 'qos' option, so in order to
/// enable 'lowlatency' you need to explicitly disable 'qos'.
lowlatency: false,
qos: {
enabled: true,
},
compression: {
enabled: false,
},
},
qos: {
enabled: true,
multicast: {
qos: {
enabled: true,
},
compression: {
enabled: false,
},
},
link: {
/// An optional whitelist of protocols to be used for accepting and opening sessions.
Expand Down Expand Up @@ -183,6 +194,9 @@
/// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress.
/// Higher values lead to a more aggressive batching but it will introduce additional latency.
backoff: 100,
// Number of threads dedicated to transmission
// By default, the number of threads is calculated as follows: 1 + ((#cores - 1) / 4)
// threads: 4,
},
},
/// Configure the zenoh RX parameters of a link
Expand Down Expand Up @@ -220,15 +234,6 @@
// ca to verify that the server at baz.com is actually baz.com, let this be true (default).
server_name_verification: null,
},

/// **Experimental** compression feature.
/// Will compress the batches hop to hop (as opposed to end to end).
/// The features "transport_compression" and "unstable" need to be enabled to handle
/// compression on the integrality of the network.
compression: {
/// When 'enabled' is true, batches will be sent compressed.
enabled: false,
},
},
/// Shared memory configuration
shared_memory: {
Expand Down
65 changes: 52 additions & 13 deletions commons/zenoh-buffers/src/bbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::{
buffer::{Buffer, SplitBuffer},
reader::HasReader,
vec,
writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer},
ZSlice,
};
use alloc::boxed::Box;
use core::num::NonZeroUsize;
use alloc::{boxed::Box, sync::Arc};
use core::{fmt, num::NonZeroUsize, option};

#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq)]
pub struct BBuf {
buffer: Box<[u8]>,
len: usize,
Expand All @@ -39,16 +41,6 @@ impl BBuf {
self.buffer.len()
}

#[must_use]
pub const fn len(&self) -> usize {
self.len
}

#[must_use]
pub const fn is_empty(&self) -> bool {
self.len == 0
}

#[must_use]
pub fn as_slice(&self) -> &[u8] {
// SAFETY: self.len is ensured by the writer to be smaller than buffer length.
Expand All @@ -70,6 +62,40 @@ impl BBuf {
}
}

impl fmt::Debug for BBuf {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:02x?}", self.as_slice())
}
}

// Buffer
impl Buffer for BBuf {
fn len(&self) -> usize {
self.len
}
}

impl Buffer for &BBuf {
fn len(&self) -> usize {
self.len
}
}

impl Buffer for &mut BBuf {
fn len(&self) -> usize {
self.len
}
}

// SplitBuffer
impl SplitBuffer for BBuf {
type Slices<'a> = option::IntoIter<&'a [u8]>;

fn slices(&self) -> Self::Slices<'_> {
Some(self.as_slice()).into_iter()
}
}

// Writer
impl HasWriter for &mut BBuf {
type Writer = Self;
Expand Down Expand Up @@ -152,6 +178,19 @@ impl<'a> HasReader for &'a BBuf {
}
}

// From impls
impl From<BBuf> for ZSlice {
fn from(value: BBuf) -> Self {
ZSlice {
buf: Arc::new(value.buffer),
start: 0,
end: value.len,
#[cfg(feature = "shared-memory")]
kind: crate::ZSliceKind::Raw,
}
}
}

#[cfg(feature = "test")]
impl BBuf {
pub fn rand(len: usize) -> Self {
Expand Down
74 changes: 40 additions & 34 deletions commons/zenoh-buffers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub mod vec;
mod zbuf;
mod zslice;

use alloc::{borrow::Cow, vec::Vec};
pub use bbuf::*;
pub use zbuf::*;
pub use zslice::*;
Expand Down Expand Up @@ -73,6 +72,45 @@ macro_rules! unsafe_slice_mut {
};
}

pub mod buffer {
use alloc::{borrow::Cow, vec::Vec};

pub trait Buffer {
/// Returns the number of bytes in the buffer.
fn len(&self) -> usize;

/// Returns `true` if the buffer has a length of 0.
fn is_empty(&self) -> bool {
self.len() == 0
}
}

/// A trait for buffers that can be composed of multiple non contiguous slices.
pub trait SplitBuffer: Buffer {
type Slices<'a>: Iterator<Item = &'a [u8]> + ExactSizeIterator
where
Self: 'a;

/// Gets all the slices of this buffer.
fn slices(&self) -> Self::Slices<'_>;

/// Returns all the bytes of this buffer in a conitguous slice.
/// This may require allocation and copy if the original buffer
/// is not contiguous.
fn contiguous(&self) -> Cow<'_, [u8]> {
let mut slices = self.slices();
match slices.len() {
0 => Cow::Borrowed(b""),
1 => Cow::Borrowed(slices.next().unwrap()),
_ => Cow::Owned(slices.fold(Vec::new(), |mut acc, it| {
acc.extend(it);
acc
})),
}
}
}
}

pub mod writer {
use crate::ZSlice;
use core::num::NonZeroUsize;
Expand Down Expand Up @@ -100,6 +138,7 @@ pub mod writer {
where
F: FnOnce(&mut [u8]) -> usize;
}

pub trait BacktrackableWriter: Writer {
type Mark;

Expand Down Expand Up @@ -175,36 +214,3 @@ pub mod reader {
fn reader(self) -> Self::Reader;
}
}

/// A trait for buffers that can be composed of multiple non contiguous slices.
pub trait SplitBuffer<'a> {
type Slices: Iterator<Item = &'a [u8]> + ExactSizeIterator;

/// Gets all the slices of this buffer.
fn slices(&'a self) -> Self::Slices;

/// Returns `true` if the buffer has a length of 0.
fn is_empty(&'a self) -> bool {
self.slices().all(<[u8]>::is_empty)
}

/// Returns the number of bytes in the buffer.
fn len(&'a self) -> usize {
self.slices().fold(0, |acc, it| acc + it.len())
}

/// Returns all the bytes of this buffer in a conitguous slice.
/// This may require allocation and copy if the original buffer
/// is not contiguous.
fn contiguous(&'a self) -> Cow<'a, [u8]> {
let mut slices = self.slices();
match slices.len() {
0 => Cow::Borrowed(b""),
1 => Cow::Borrowed(slices.next().unwrap()),
_ => Cow::Owned(slices.fold(Vec::new(), |mut acc, it| {
acc.extend(it);
acc
})),
}
}
}
33 changes: 32 additions & 1 deletion commons/zenoh-buffers/src/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,42 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::{
buffer::{Buffer, SplitBuffer},
reader::{BacktrackableReader, DidntRead, DidntSiphon, HasReader, Reader, SiphonableReader},
writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer},
ZSlice,
};
use core::{marker::PhantomData, mem, num::NonZeroUsize, slice};
use core::{
marker::PhantomData,
mem,
num::NonZeroUsize,
option,
slice::{self},
};

// Buffer
impl Buffer for &[u8] {
#[inline(always)]
fn len(&self) -> usize {
<[u8]>::len(self)
}
}

impl Buffer for &mut [u8] {
#[inline(always)]
fn len(&self) -> usize {
<[u8]>::len(self)
}
}

// SplitBuffer
impl<'b> SplitBuffer for &'b [u8] {
type Slices<'a> = option::IntoIter<&'a [u8]> where 'b: 'a;

fn slices(&self) -> Self::Slices<'_> {
Some(*self).into_iter()
}
}

// Writer
impl HasWriter for &mut [u8] {
Expand Down
31 changes: 30 additions & 1 deletion commons/zenoh-buffers/src/vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::{
buffer::{Buffer, SplitBuffer},
reader::HasReader,
writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer},
};
use alloc::vec::Vec;
use core::{mem, num::NonZeroUsize};
use core::{mem, num::NonZeroUsize, option};

/// Allocate a vector with a given capacity and sets the length to that capacity.
#[must_use]
Expand All @@ -30,6 +31,34 @@ pub fn uninit(capacity: usize) -> Vec<u8> {
vbuf
}

// Buffer
impl Buffer for Vec<u8> {
fn len(&self) -> usize {
Vec::len(self)
}
}

impl Buffer for &Vec<u8> {
fn len(&self) -> usize {
Vec::len(self)
}
}

impl Buffer for &mut Vec<u8> {
fn len(&self) -> usize {
Vec::len(self)
}
}

// SplitBuffer
impl SplitBuffer for Vec<u8> {
type Slices<'a> = option::IntoIter<&'a [u8]>;

fn slices(&self) -> Self::Slices<'_> {
Some(self.as_slice()).into_iter()
}
}

// Writer
impl<'a> HasWriter for &'a mut Vec<u8> {
type Writer = Self;
Expand Down
26 changes: 13 additions & 13 deletions commons/zenoh-buffers/src/zbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
#[cfg(feature = "shared-memory")]
use crate::ZSliceKind;
use crate::{
buffer::{Buffer, SplitBuffer},
reader::{BacktrackableReader, DidntRead, DidntSiphon, HasReader, Reader, SiphonableReader},
writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer},
SplitBuffer, ZSlice,
ZSlice,
};
use alloc::{sync::Arc, vec::Vec};
use core::{cmp, iter, mem, num::NonZeroUsize, ptr, slice};
Expand Down Expand Up @@ -56,18 +57,8 @@ impl ZBuf {
}
}

impl<'a> SplitBuffer<'a> for ZBuf {
type Slices = iter::Map<slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;

fn slices(&'a self) -> Self::Slices {
self.slices.as_ref().iter().map(ZSlice::as_slice)
}

#[inline(always)]
fn is_empty(&self) -> bool {
self.len() == 0
}

// Buffer
impl Buffer for ZBuf {
#[inline(always)]
fn len(&self) -> usize {
self.slices
Expand All @@ -77,6 +68,15 @@ impl<'a> SplitBuffer<'a> for ZBuf {
}
}

// SplitBuffer
impl SplitBuffer for ZBuf {
type Slices<'a> = iter::Map<slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;

fn slices(&self) -> Self::Slices<'_> {
self.slices.as_ref().iter().map(ZSlice::as_slice)
}
}

impl PartialEq for ZBuf {
fn eq(&self, other: &Self) -> bool {
let mut self_slices = self.slices();
Expand Down
Loading
Loading