Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add writer, seek, iterator, tuple support to Payload and ZSerde #919

Merged
merged 21 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ tokio-rustls = "0.25.0"
console-subscriber = "0.2"
typenum = "1.16.0"
uhlc = { version = "0.7.0", default-features = false } # Default features are disabled due to usage in no_std crates
unwrap-infallible = "0.1.5"
unzip-n = "0.1.2"
url = "2.3.1"
urlencoding = "2.1.2"
Expand Down
12 changes: 12 additions & 0 deletions commons/zenoh-buffers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,18 @@ pub mod reader {
fn rewind(&mut self, mark: Self::Mark) -> bool;
}

pub trait AdvanceableReader: Reader {
milyin marked this conversation as resolved.
Show resolved Hide resolved
fn skip(&mut self, offset: usize) -> Result<(), DidntRead>;
fn backtrack(&mut self, offset: usize) -> Result<(), DidntRead>;
fn advance(&mut self, offset: isize) -> Result<(), DidntRead> {
if offset > 0 {
self.skip(offset as usize)
} else {
self.backtrack((-offset) as usize)
}
}
}

#[derive(Debug, Clone, Copy)]
pub struct DidntSiphon;

Expand Down
181 changes: 164 additions & 17 deletions commons/zenoh-buffers/src/zbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
use crate::ZSliceKind;
use crate::{
buffer::{Buffer, SplitBuffer},
reader::{BacktrackableReader, DidntRead, DidntSiphon, HasReader, Reader, SiphonableReader},
reader::{
AdvanceableReader, BacktrackableReader, DidntRead, DidntSiphon, HasReader, Reader,
SiphonableReader,
},
writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer},
ZSlice,
ZSlice, ZSliceBuffer,
};
use alloc::{sync::Arc, vec::Vec};
use core::{cmp, iter, mem, num::NonZeroUsize, ops::RangeBounds, ptr};
#[cfg(feature = "std")]
use std::io;
use zenoh_collections::SingleOrVec;

fn get_mut_unchecked<T>(arc: &mut Arc<T>) -> &mut T {
Expand Down Expand Up @@ -58,6 +63,21 @@ impl ZBuf {
}
}

pub fn to_zslice(&self) -> ZSlice {
let mut slices = self.zslices();
match self.slices.len() {
0 => ZSlice::empty(),
// SAFETY: it's safe to use unwrap_unchecked() beacuse we are explicitly checking the length is 1.
1 => unsafe { slices.next().unwrap_unchecked().clone() },
_ => slices
.fold(Vec::new(), |mut acc, it| {
acc.extend(it.as_slice());
acc
})
.into(),
}
}

pub fn splice<Range: RangeBounds<usize>>(&mut self, erased: Range, replacement: &[u8]) {
let start = match erased.start_bound() {
core::ops::Bound::Included(n) => *n,
Expand Down Expand Up @@ -199,15 +219,31 @@ impl PartialEq for ZBuf {
}

// From impls
impl From<ZSlice> for ZBuf {
fn from(t: ZSlice) -> Self {
let mut zbuf = ZBuf::empty();
zbuf.push_zslice(t);
zbuf
}
}

impl<T> From<Arc<T>> for ZBuf
where
T: ZSliceBuffer + 'static,
{
fn from(t: Arc<T>) -> Self {
let zslice: ZSlice = t.into();
Self::from(zslice)
}
}

impl<T> From<T> for ZBuf
where
T: Into<ZSlice>,
T: ZSliceBuffer + 'static,
{
fn from(t: T) -> Self {
let mut zbuf = ZBuf::empty();
let zslice: ZSlice = t.into();
zbuf.push_zslice(zslice);
zbuf
Self::from(zslice)
}
}

Expand Down Expand Up @@ -270,7 +306,7 @@ impl<'a> Reader for ZBufReader<'a> {
}

fn read_exact(&mut self, into: &mut [u8]) -> Result<(), DidntRead> {
let len = self.read(into)?;
let len = Reader::read(self, into)?;
if len.get() == into.len() {
Ok(())
} else {
Expand Down Expand Up @@ -317,7 +353,7 @@ impl<'a> Reader for ZBufReader<'a> {
match (slice.len() - self.cursor.byte).cmp(&len) {
cmp::Ordering::Less => {
let mut buffer = crate::vec::uninit(len);
self.read_exact(&mut buffer)?;
Reader::read_exact(self, &mut buffer)?;
Ok(buffer.into())
}
cmp::Ordering::Equal => {
Expand Down Expand Up @@ -388,13 +424,81 @@ impl<'a> SiphonableReader for ZBufReader<'a> {
}

#[cfg(feature = "std")]
impl<'a> std::io::Read for ZBufReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
impl<'a> io::Read for ZBufReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match <Self as Reader>::read(self, buf) {
Ok(n) => Ok(n.get()),
Err(_) => Ok(0),
}
}
}

impl<'a> AdvanceableReader for ZBufReader<'a> {
fn skip(&mut self, offset: usize) -> Result<(), DidntRead> {
let mut remaining_offset = offset;
while remaining_offset > 0 {
let s = self.inner.slices.get(self.cursor.slice).ok_or(DidntRead)?;
let remains_in_current_slice = s.len() - self.cursor.byte;
let advance = remaining_offset.min(remains_in_current_slice);
remaining_offset -= advance;
self.cursor.byte += advance;
if self.cursor.byte == s.len() {
self.cursor.slice += 1;
self.cursor.byte = 0;
}
}
Ok(())
}

fn backtrack(&mut self, offset: usize) -> Result<(), DidntRead> {
let mut remaining_offset = offset;
while remaining_offset > 0 {
let backtrack = remaining_offset.min(self.cursor.byte);
remaining_offset -= backtrack;
self.cursor.byte -= backtrack;
if self.cursor.byte == 0 {
if self.cursor.slice == 0 {
break;
}
self.cursor.slice -= 1;
self.cursor.byte = self
.inner
.slices
.get(self.cursor.slice)
.ok_or(DidntRead)?
.len();
}
}
if remaining_offset == 0 {
Ok(())
} else {
Err(DidntRead)
}
}
}

#[cfg(feature = "std")]
impl<'a> io::Seek for ZBufReader<'a> {
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
let current_pos = self
.inner
.slices()
.take(self.cursor.slice)
.fold(0, |acc, s| acc + s.len())
+ self.cursor.byte;
let current_pos = i64::try_from(current_pos)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))?;

let offset = match pos {
std::io::SeekFrom::Start(s) => i64::try_from(s).unwrap_or(i64::MAX) - current_pos,
std::io::SeekFrom::Current(s) => s,
std::io::SeekFrom::End(s) => self.inner.len() as i64 + s - current_pos,
};
match self.advance(offset as isize) {
Ok(()) => Ok((offset + current_pos) as u64),
Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"UnexpectedEof",
std::io::ErrorKind::InvalidInput,
"InvalidInput",
)),
}
}
Expand Down Expand Up @@ -614,18 +718,18 @@ impl BacktrackableWriter for ZBufWriter<'_> {
}

#[cfg(feature = "std")]
impl<'a> std::io::Write for ZBufWriter<'a> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
impl<'a> io::Write for ZBufWriter<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match <Self as Writer>::write(self, buf) {
Ok(n) => Ok(n.get()),
Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
Err(_) => Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"UnexpectedEof",
)),
}
}

fn flush(&mut self) -> std::io::Result<()> {
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
Expand Down Expand Up @@ -668,4 +772,47 @@ mod tests {

assert_eq!(zbuf1, zbuf2);
}

#[cfg(feature = "std")]
#[test]
fn zbuf_seek() {
use super::{HasReader, ZBuf};
use crate::reader::Reader;
use std::io::Seek;

let mut buf = ZBuf::empty();
buf.push_zslice([0u8, 1u8, 2u8, 3u8].into());
buf.push_zslice([4u8, 5u8, 6u8, 7u8, 8u8].into());
buf.push_zslice([9u8, 10u8, 11u8, 12u8, 13u8, 14u8].into());
let mut reader = buf.reader();

assert_eq!(reader.stream_position().unwrap(), 0);
assert_eq!(reader.read_u8().unwrap(), 0);
assert_eq!(reader.seek(std::io::SeekFrom::Current(6)).unwrap(), 7);
assert_eq!(reader.read_u8().unwrap(), 7);
assert_eq!(reader.seek(std::io::SeekFrom::Current(-5)).unwrap(), 3);
assert_eq!(reader.read_u8().unwrap(), 3);
assert_eq!(reader.seek(std::io::SeekFrom::Current(10)).unwrap(), 14);
assert_eq!(reader.read_u8().unwrap(), 14);
reader.seek(std::io::SeekFrom::Current(100)).unwrap_err();

assert_eq!(reader.seek(std::io::SeekFrom::Start(0)).unwrap(), 0);
assert_eq!(reader.read_u8().unwrap(), 0);
assert_eq!(reader.seek(std::io::SeekFrom::Start(12)).unwrap(), 12);
assert_eq!(reader.read_u8().unwrap(), 12);
assert_eq!(reader.seek(std::io::SeekFrom::Start(15)).unwrap(), 15);
reader.read_u8().unwrap_err();
reader.seek(std::io::SeekFrom::Start(100)).unwrap_err();

assert_eq!(reader.seek(std::io::SeekFrom::End(0)).unwrap(), 15);
reader.read_u8().unwrap_err();
assert_eq!(reader.seek(std::io::SeekFrom::End(-5)).unwrap(), 10);
assert_eq!(reader.read_u8().unwrap(), 10);
assert_eq!(reader.seek(std::io::SeekFrom::End(-15)).unwrap(), 0);
assert_eq!(reader.read_u8().unwrap(), 0);
reader.seek(std::io::SeekFrom::End(-20)).unwrap_err();

assert_eq!(reader.seek(std::io::SeekFrom::Start(10)).unwrap(), 10);
reader.seek(std::io::SeekFrom::Current(-100)).unwrap_err();
}
}
4 changes: 4 additions & 0 deletions commons/zenoh-buffers/src/zslice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ impl ZSlice {
}
}

pub fn empty() -> Self {
unsafe { ZSlice::new_unchecked(Arc::new([]), 0, 0) }
}

/// # Safety
/// This function does not verify wether the `start` and `end` indexes are within the buffer boundaries.
/// If a [`ZSlice`] is built via this constructor, a later access may panic if `start` and `end` indexes are out-of-bound.
Expand Down
4 changes: 2 additions & 2 deletions commons/zenoh-codec/src/core/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ where
fn read(self, reader: &mut R) -> Result<Encoding, Self::Error> {
let zodec = Zenoh080Bounded::<u32>::new();
let id: u32 = zodec.read(&mut *reader)?;
let (id, has_suffix) = (
let (id, has_schema) = (
(id >> 1) as EncodingId,
imsg::has_flag(id as u8, flag::S as u8),
);

let mut schema = None;
if has_suffix {
if has_schema {
let zodec = Zenoh080Bounded::<u8>::new();
schema = Some(zodec.read(&mut *reader)?);
}
Expand Down
3 changes: 3 additions & 0 deletions commons/zenoh-collections/src/single_or_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,17 @@ impl<T> SingleOrVec<T> {
self.vectorize().insert(at, value);
}
}

enum DrainInner<'a, T> {
Vec(alloc::vec::Drain<'a, T>),
Single(&'a mut SingleOrVecInner<T>),
Done,
}

pub struct Drain<'a, T> {
inner: DrainInner<'a, T>,
}

impl<'a, T> Iterator for Drain<'a, T> {
type Item = T;

Expand Down
6 changes: 3 additions & 3 deletions commons/zenoh-protocol/src/core/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ pub type EncodingId = u16;

/// [`Encoding`] is a metadata that indicates how the data payload should be interpreted.
/// For wire-efficiency and extensibility purposes, Zenoh defines an [`Encoding`] as
/// composed of an unsigned integer prefix and a string suffix. The actual meaning of the
/// prefix and suffix are out-of-scope of the protocol definition. Therefore, Zenoh does not
/// composed of an unsigned integer prefix and a bytes schema. The actual meaning of the
/// prefix and schema are out-of-scope of the protocol definition. Therefore, Zenoh does not
/// impose any encoding mapping and users are free to use any mapping they like.
/// Nevertheless, it is worth highlighting that Zenoh still provides a default mapping as part
/// of the API as per user convenience. That mapping has no impact on the Zenoh protocol definition.
Expand All @@ -40,7 +40,7 @@ pub struct Encoding {
/// +---------------+
/// ```
pub mod flag {
pub const S: u32 = 1; // 0x01 Suffix if S==1 then suffix is present
pub const S: u32 = 1; // 0x01 Suffix if S==1 then schema is present
}

impl Encoding {
Expand Down
Loading
Loading