diff --git a/Cargo.lock b/Cargo.lock index 9dff82ad80..05d608cb3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3954,6 +3954,12 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "unwrap-infallible" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "151ac09978d3c2862c4e39b557f4eceee2cc72150bc4cb4f16abf061b6e381fb" + [[package]] name = "unzip-n" version = "0.1.2" @@ -4479,6 +4485,7 @@ dependencies = [ "tokio", "tokio-util", "uhlc", + "unwrap-infallible", "uuid", "vec_map", "zenoh-buffers", @@ -4605,6 +4612,7 @@ dependencies = [ "zenoh", "zenoh-collections", "zenoh-ext", + "zenoh-shm", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9210c96b70..d02f84eca8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -159,6 +159,7 @@ tokio-rustls = "0.25.0" console-subscriber = "0.2" typenum = "1.16.0" uhlc = { version = "0.7.0", default-features = false } # Default features are disabled due to usage in no_std crates +unwrap-infallible = "0.1.5" unzip-n = "0.1.2" url = "2.3.1" urlencoding = "2.1.2" diff --git a/commons/zenoh-buffers/src/lib.rs b/commons/zenoh-buffers/src/lib.rs index eae7f1715c..117fb412b7 100644 --- a/commons/zenoh-buffers/src/lib.rs +++ b/commons/zenoh-buffers/src/lib.rs @@ -199,6 +199,18 @@ pub mod reader { fn rewind(&mut self, mark: Self::Mark) -> bool; } + pub trait AdvanceableReader: Reader { + fn skip(&mut self, offset: usize) -> Result<(), DidntRead>; + fn backtrack(&mut self, offset: usize) -> Result<(), DidntRead>; + fn advance(&mut self, offset: isize) -> Result<(), DidntRead> { + if offset > 0 { + self.skip(offset as usize) + } else { + self.backtrack((-offset) as usize) + } + } + } + #[derive(Debug, Clone, Copy)] pub struct DidntSiphon; diff --git a/commons/zenoh-buffers/src/zbuf.rs b/commons/zenoh-buffers/src/zbuf.rs index fd86f454af..4a655ce36a 100644 --- a/commons/zenoh-buffers/src/zbuf.rs +++ b/commons/zenoh-buffers/src/zbuf.rs @@ -15,12 +15,17 @@ use crate::ZSliceKind; use crate::{ buffer::{Buffer, SplitBuffer}, - reader::{BacktrackableReader, DidntRead, DidntSiphon, HasReader, Reader, SiphonableReader}, + reader::{ + AdvanceableReader, BacktrackableReader, DidntRead, DidntSiphon, HasReader, Reader, + SiphonableReader, + }, writer::{BacktrackableWriter, DidntWrite, HasWriter, Writer}, - ZSlice, + ZSlice, ZSliceBuffer, }; use alloc::{sync::Arc, vec::Vec}; use core::{cmp, iter, mem, num::NonZeroUsize, ops::RangeBounds, ptr}; +#[cfg(feature = "std")] +use std::io; use zenoh_collections::SingleOrVec; fn get_mut_unchecked(arc: &mut Arc) -> &mut T { @@ -58,6 +63,21 @@ impl ZBuf { } } + pub fn to_zslice(&self) -> ZSlice { + let mut slices = self.zslices(); + match self.slices.len() { + 0 => ZSlice::empty(), + // SAFETY: it's safe to use unwrap_unchecked() beacuse we are explicitly checking the length is 1. + 1 => unsafe { slices.next().unwrap_unchecked().clone() }, + _ => slices + .fold(Vec::new(), |mut acc, it| { + acc.extend(it.as_slice()); + acc + }) + .into(), + } + } + pub fn splice>(&mut self, erased: Range, replacement: &[u8]) { let start = match erased.start_bound() { core::ops::Bound::Included(n) => *n, @@ -199,15 +219,31 @@ impl PartialEq for ZBuf { } // From impls +impl From for ZBuf { + fn from(t: ZSlice) -> Self { + let mut zbuf = ZBuf::empty(); + zbuf.push_zslice(t); + zbuf + } +} + +impl From> for ZBuf +where + T: ZSliceBuffer + 'static, +{ + fn from(t: Arc) -> Self { + let zslice: ZSlice = t.into(); + Self::from(zslice) + } +} + impl From for ZBuf where - T: Into, + T: ZSliceBuffer + 'static, { fn from(t: T) -> Self { - let mut zbuf = ZBuf::empty(); let zslice: ZSlice = t.into(); - zbuf.push_zslice(zslice); - zbuf + Self::from(zslice) } } @@ -270,7 +306,7 @@ impl<'a> Reader for ZBufReader<'a> { } fn read_exact(&mut self, into: &mut [u8]) -> Result<(), DidntRead> { - let len = self.read(into)?; + let len = Reader::read(self, into)?; if len.get() == into.len() { Ok(()) } else { @@ -317,7 +353,7 @@ impl<'a> Reader for ZBufReader<'a> { match (slice.len() - self.cursor.byte).cmp(&len) { cmp::Ordering::Less => { let mut buffer = crate::vec::uninit(len); - self.read_exact(&mut buffer)?; + Reader::read_exact(self, &mut buffer)?; Ok(buffer.into()) } cmp::Ordering::Equal => { @@ -388,13 +424,81 @@ impl<'a> SiphonableReader for ZBufReader<'a> { } #[cfg(feature = "std")] -impl<'a> std::io::Read for ZBufReader<'a> { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { +impl<'a> io::Read for ZBufReader<'a> { + fn read(&mut self, buf: &mut [u8]) -> io::Result { match ::read(self, buf) { Ok(n) => Ok(n.get()), + Err(_) => Ok(0), + } + } +} + +impl<'a> AdvanceableReader for ZBufReader<'a> { + fn skip(&mut self, offset: usize) -> Result<(), DidntRead> { + let mut remaining_offset = offset; + while remaining_offset > 0 { + let s = self.inner.slices.get(self.cursor.slice).ok_or(DidntRead)?; + let remains_in_current_slice = s.len() - self.cursor.byte; + let advance = remaining_offset.min(remains_in_current_slice); + remaining_offset -= advance; + self.cursor.byte += advance; + if self.cursor.byte == s.len() { + self.cursor.slice += 1; + self.cursor.byte = 0; + } + } + Ok(()) + } + + fn backtrack(&mut self, offset: usize) -> Result<(), DidntRead> { + let mut remaining_offset = offset; + while remaining_offset > 0 { + let backtrack = remaining_offset.min(self.cursor.byte); + remaining_offset -= backtrack; + self.cursor.byte -= backtrack; + if self.cursor.byte == 0 { + if self.cursor.slice == 0 { + break; + } + self.cursor.slice -= 1; + self.cursor.byte = self + .inner + .slices + .get(self.cursor.slice) + .ok_or(DidntRead)? + .len(); + } + } + if remaining_offset == 0 { + Ok(()) + } else { + Err(DidntRead) + } + } +} + +#[cfg(feature = "std")] +impl<'a> io::Seek for ZBufReader<'a> { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + let current_pos = self + .inner + .slices() + .take(self.cursor.slice) + .fold(0, |acc, s| acc + s.len()) + + self.cursor.byte; + let current_pos = i64::try_from(current_pos) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{}", e)))?; + + let offset = match pos { + std::io::SeekFrom::Start(s) => i64::try_from(s).unwrap_or(i64::MAX) - current_pos, + std::io::SeekFrom::Current(s) => s, + std::io::SeekFrom::End(s) => self.inner.len() as i64 + s - current_pos, + }; + match self.advance(offset as isize) { + Ok(()) => Ok((offset + current_pos) as u64), Err(_) => Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - "UnexpectedEof", + std::io::ErrorKind::InvalidInput, + "InvalidInput", )), } } @@ -614,18 +718,18 @@ impl BacktrackableWriter for ZBufWriter<'_> { } #[cfg(feature = "std")] -impl<'a> std::io::Write for ZBufWriter<'a> { - fn write(&mut self, buf: &[u8]) -> std::io::Result { +impl<'a> io::Write for ZBufWriter<'a> { + fn write(&mut self, buf: &[u8]) -> io::Result { match ::write(self, buf) { Ok(n) => Ok(n.get()), - Err(_) => Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, + Err(_) => Err(io::Error::new( + io::ErrorKind::UnexpectedEof, "UnexpectedEof", )), } } - fn flush(&mut self) -> std::io::Result<()> { + fn flush(&mut self) -> io::Result<()> { Ok(()) } } @@ -668,4 +772,47 @@ mod tests { assert_eq!(zbuf1, zbuf2); } + + #[cfg(feature = "std")] + #[test] + fn zbuf_seek() { + use super::{HasReader, ZBuf}; + use crate::reader::Reader; + use std::io::Seek; + + let mut buf = ZBuf::empty(); + buf.push_zslice([0u8, 1u8, 2u8, 3u8].into()); + buf.push_zslice([4u8, 5u8, 6u8, 7u8, 8u8].into()); + buf.push_zslice([9u8, 10u8, 11u8, 12u8, 13u8, 14u8].into()); + let mut reader = buf.reader(); + + assert_eq!(reader.stream_position().unwrap(), 0); + assert_eq!(reader.read_u8().unwrap(), 0); + assert_eq!(reader.seek(std::io::SeekFrom::Current(6)).unwrap(), 7); + assert_eq!(reader.read_u8().unwrap(), 7); + assert_eq!(reader.seek(std::io::SeekFrom::Current(-5)).unwrap(), 3); + assert_eq!(reader.read_u8().unwrap(), 3); + assert_eq!(reader.seek(std::io::SeekFrom::Current(10)).unwrap(), 14); + assert_eq!(reader.read_u8().unwrap(), 14); + reader.seek(std::io::SeekFrom::Current(100)).unwrap_err(); + + assert_eq!(reader.seek(std::io::SeekFrom::Start(0)).unwrap(), 0); + assert_eq!(reader.read_u8().unwrap(), 0); + assert_eq!(reader.seek(std::io::SeekFrom::Start(12)).unwrap(), 12); + assert_eq!(reader.read_u8().unwrap(), 12); + assert_eq!(reader.seek(std::io::SeekFrom::Start(15)).unwrap(), 15); + reader.read_u8().unwrap_err(); + reader.seek(std::io::SeekFrom::Start(100)).unwrap_err(); + + assert_eq!(reader.seek(std::io::SeekFrom::End(0)).unwrap(), 15); + reader.read_u8().unwrap_err(); + assert_eq!(reader.seek(std::io::SeekFrom::End(-5)).unwrap(), 10); + assert_eq!(reader.read_u8().unwrap(), 10); + assert_eq!(reader.seek(std::io::SeekFrom::End(-15)).unwrap(), 0); + assert_eq!(reader.read_u8().unwrap(), 0); + reader.seek(std::io::SeekFrom::End(-20)).unwrap_err(); + + assert_eq!(reader.seek(std::io::SeekFrom::Start(10)).unwrap(), 10); + reader.seek(std::io::SeekFrom::Current(-100)).unwrap_err(); + } } diff --git a/commons/zenoh-buffers/src/zslice.rs b/commons/zenoh-buffers/src/zslice.rs index c15cbc6828..05c77cac7d 100644 --- a/commons/zenoh-buffers/src/zslice.rs +++ b/commons/zenoh-buffers/src/zslice.rs @@ -114,6 +114,10 @@ impl ZSlice { } } + pub fn empty() -> Self { + unsafe { ZSlice::new_unchecked(Arc::new([]), 0, 0) } + } + /// # Safety /// This function does not verify wether the `start` and `end` indexes are within the buffer boundaries. /// If a [`ZSlice`] is built via this constructor, a later access may panic if `start` and `end` indexes are out-of-bound. diff --git a/commons/zenoh-codec/src/core/encoding.rs b/commons/zenoh-codec/src/core/encoding.rs index cfbe0084ba..c8033cdd5f 100644 --- a/commons/zenoh-codec/src/core/encoding.rs +++ b/commons/zenoh-codec/src/core/encoding.rs @@ -62,13 +62,13 @@ where fn read(self, reader: &mut R) -> Result { let zodec = Zenoh080Bounded::::new(); let id: u32 = zodec.read(&mut *reader)?; - let (id, has_suffix) = ( + let (id, has_schema) = ( (id >> 1) as EncodingId, imsg::has_flag(id as u8, flag::S as u8), ); let mut schema = None; - if has_suffix { + if has_schema { let zodec = Zenoh080Bounded::::new(); schema = Some(zodec.read(&mut *reader)?); } diff --git a/commons/zenoh-collections/src/single_or_vec.rs b/commons/zenoh-collections/src/single_or_vec.rs index ceb43e4025..ed82bf49af 100644 --- a/commons/zenoh-collections/src/single_or_vec.rs +++ b/commons/zenoh-collections/src/single_or_vec.rs @@ -182,14 +182,17 @@ impl SingleOrVec { self.vectorize().insert(at, value); } } + enum DrainInner<'a, T> { Vec(alloc::vec::Drain<'a, T>), Single(&'a mut SingleOrVecInner), Done, } + pub struct Drain<'a, T> { inner: DrainInner<'a, T>, } + impl<'a, T> Iterator for Drain<'a, T> { type Item = T; diff --git a/commons/zenoh-protocol/src/core/encoding.rs b/commons/zenoh-protocol/src/core/encoding.rs index 9b9aa5bf2f..70afdbf143 100644 --- a/commons/zenoh-protocol/src/core/encoding.rs +++ b/commons/zenoh-protocol/src/core/encoding.rs @@ -18,8 +18,8 @@ pub type EncodingId = u16; /// [`Encoding`] is a metadata that indicates how the data payload should be interpreted. /// For wire-efficiency and extensibility purposes, Zenoh defines an [`Encoding`] as -/// composed of an unsigned integer prefix and a string suffix. The actual meaning of the -/// prefix and suffix are out-of-scope of the protocol definition. Therefore, Zenoh does not +/// composed of an unsigned integer prefix and a bytes schema. The actual meaning of the +/// prefix and schema are out-of-scope of the protocol definition. Therefore, Zenoh does not /// impose any encoding mapping and users are free to use any mapping they like. /// Nevertheless, it is worth highlighting that Zenoh still provides a default mapping as part /// of the API as per user convenience. That mapping has no impact on the Zenoh protocol definition. @@ -40,7 +40,7 @@ pub struct Encoding { /// +---------------+ /// ``` pub mod flag { - pub const S: u32 = 1; // 0x01 Suffix if S==1 then suffix is present + pub const S: u32 = 1; // 0x01 Suffix if S==1 then schema is present } impl Encoding { diff --git a/examples/Cargo.toml b/examples/Cargo.toml index fc1db17fe8..4a4a4fef3e 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -27,7 +27,7 @@ readme = "README.md" publish = false [features] -shared-memory = ["zenoh/shared-memory"] +shared-memory = ["zenoh-shm","zenoh/shared-memory"] unstable = ["zenoh/unstable"] transport_unixpipe = ["zenoh/transport_unixpipe"] @@ -52,6 +52,7 @@ log = { workspace = true } zenoh = { workspace = true } zenoh-collections = { workspace = true } zenoh-ext = { workspace = true } +zenoh-shm = { workspace = true, optional = true } [dev-dependencies] rand = { workspace = true, features = ["default"] } @@ -96,6 +97,11 @@ required-features = ["shared-memory"] name = "z_sub" path = "examples/z_sub.rs" +[[example]] +name = "z_sub_shm" +path = "examples/z_sub_shm.rs" +required-features = ["shared-memory"] + [[example]] name = "z_pull" path = "examples/z_pull.rs" diff --git a/examples/examples/z_sub.rs b/examples/examples/z_sub.rs index fbce562c2e..299f0c8f49 100644 --- a/examples/examples/z_sub.rs +++ b/examples/examples/z_sub.rs @@ -32,7 +32,6 @@ async fn main() { let session = zenoh::open(config).res().await.unwrap(); println!("Declaring Subscriber on '{}'...", &key_expr); - let subscriber = session.declare_subscriber(&key_expr).res().await.unwrap(); println!("Press CTRL-C to quit..."); diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs new file mode 100644 index 0000000000..630876f287 --- /dev/null +++ b/examples/examples/z_sub_shm.rs @@ -0,0 +1,66 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::Parser; +use zenoh::config::Config; +use zenoh::prelude::r#async::*; +use zenoh_examples::CommonArgs; +use zenoh_shm::SharedMemoryBuf; + +#[tokio::main] +async fn main() { + // Initiate logging + env_logger::init(); + + let (mut config, key_expr) = parse_args(); + + // A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm` to operate + // over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the + // subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + config.transport.shared_memory.set_enabled(true).unwrap(); + + println!("Opening session..."); + let session = zenoh::open(config).res().await.unwrap(); + + println!("Declaring Subscriber on '{}'...", &key_expr); + let subscriber = session.declare_subscriber(&key_expr).res().await.unwrap(); + + println!("Press CTRL-C to quit..."); + while let Ok(sample) = subscriber.recv_async().await { + match sample.payload().deserialize::() { + Ok(payload) => println!( + ">> [Subscriber] Received {} ('{}': '{:02x?}')", + sample.kind(), + sample.key_expr().as_str(), + payload.as_slice() + ), + Err(e) => { + println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); + } + } + } +} + +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct SubArgs { + #[arg(short, long, default_value = "demo/example/**")] + /// The Key Expression to subscribe to. + key: KeyExpr<'static>, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> (Config, KeyExpr<'static>) { + let args = SubArgs::parse(); + (args.common.into(), args.key) +} diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index d20a4b914e..80cf8ba1bc 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -90,6 +90,7 @@ serde_yaml = { workspace = true } socket2 = { workspace = true } stop-token = { workspace = true } uhlc = { workspace = true, features = ["default"] } +unwrap-infallible = { workspace = true } uuid = { workspace = true, features = ["default"] } vec_map = { workspace = true } zenoh-buffers = { workspace = true, features = ["std"] } diff --git a/zenoh/src/payload.rs b/zenoh/src/payload.rs index ed2a58145c..eac4f58e7c 100644 --- a/zenoh/src/payload.rs +++ b/zenoh/src/payload.rs @@ -14,17 +14,40 @@ //! Payload primitives. use crate::buffers::ZBuf; +use std::str::Utf8Error; use std::{ - borrow::Cow, convert::Infallible, fmt::Debug, ops::Deref, string::FromUtf8Error, sync::Arc, + borrow::Cow, convert::Infallible, fmt::Debug, marker::PhantomData, ops::Deref, + string::FromUtf8Error, sync::Arc, }; -use zenoh_buffers::buffer::Buffer; +use unwrap_infallible::UnwrapInfallible; +use zenoh_buffers::ZBufWriter; use zenoh_buffers::{ - buffer::SplitBuffer, reader::HasReader, writer::HasWriter, ZBufReader, ZSlice, + buffer::{Buffer, SplitBuffer}, + reader::HasReader, + writer::HasWriter, + ZBufReader, ZSlice, }; -use zenoh_result::ZResult; +use zenoh_codec::{RCodec, WCodec, Zenoh080}; +use zenoh_result::{ZError, ZResult}; #[cfg(feature = "shared-memory")] use zenoh_shm::SharedMemoryBuf; +/// Trait to encode a type `T` into a [`Value`]. +pub trait Serialize { + type Output; + + /// The implementer should take care of serializing the type `T` and set the proper [`Encoding`]. + fn serialize(self, t: T) -> Self::Output; +} + +pub trait Deserialize<'a, T> { + type Error; + + /// The implementer should take care of deserializing the type `T` based on the [`Encoding`] information. + fn deserialize(self, t: &'a Payload) -> Result; +} + +/// A payload contains the serialized bytes of user data. #[repr(transparent)] #[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct Payload(ZBuf); @@ -35,7 +58,7 @@ impl Payload { Self(ZBuf::empty()) } - /// Create a [`Payload`] from any type `T` that can implements [`Into`]. + /// Create a [`Payload`] from any type `T` that implements [`Into`]. pub fn new(t: T) -> Self where T: Into, @@ -57,19 +80,35 @@ impl Payload { pub fn reader(&self) -> PayloadReader<'_> { PayloadReader(self.0.reader()) } -} -/// A reader that implements [`std::io::Read`] trait to read from a [`Payload`]. -pub struct PayloadReader<'a>(ZBufReader<'a>); + /// Build a [`Payload`] from a generic reader implementing [`std::io::Read`]. This operation copies data from the reader. + pub fn from_reader(mut reader: R) -> Result + where + R: std::io::Read, + { + let mut buf: Vec = vec![]; + reader.read_to_end(&mut buf)?; + Ok(Payload::new(buf)) + } -impl std::io::Read for PayloadReader<'_> { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - self.0.read(buf) + /// Get a [`PayloadReader`] implementing [`std::io::Read`] trait. + pub fn iter(&self) -> PayloadIterator<'_, T> + where + T: for<'b> TryFrom<&'b Payload>, + for<'b> ZSerde: Deserialize<'b, T>, + for<'b> >::Error: Debug, + { + PayloadIterator { + reader: self.0.reader(), + _t: PhantomData::, + } + } + + /// Get a [`PayloadWriter`] implementing [`std::io::Write`] trait. + pub fn writer(&mut self) -> PayloadWriter<'_> { + PayloadWriter(self.0.writer()) } -} -/// Provide some facilities specific to the Rust API to encode/decode a [`Value`] with an `Serialize`. -impl Payload { /// Encode an object of type `T` as a [`Value`] using the [`ZSerde`]. /// /// ```rust @@ -88,30 +127,108 @@ impl Payload { } /// Decode an object of type `T` from a [`Value`] using the [`ZSerde`]. - /// See [encode](Value::encode) for an example. pub fn deserialize<'a, T>(&'a self) -> ZResult where ZSerde: Deserialize<'a, T>, >::Error: Debug, { - let t: T = ZSerde.deserialize(self).map_err(|e| zerror!("{:?}", e))?; - Ok(t) + ZSerde + .deserialize(self) + .map_err(|e| zerror!("{:?}", e).into()) + } + + /// Decode an object of type `T` from a [`Value`] using the [`ZSerde`]. + pub fn into<'a, T>(&'a self) -> T + where + ZSerde: Deserialize<'a, T, Error = Infallible>, + >::Error: Debug, + { + ZSerde.deserialize(self).unwrap_infallible() } } -/// Trait to encode a type `T` into a [`Value`]. -pub trait Serialize { - type Output; +/// A reader that implements [`std::io::Read`] trait to read from a [`Payload`]. +#[repr(transparent)] +#[derive(Debug)] +pub struct PayloadReader<'a>(ZBufReader<'a>); - /// The implementer should take care of serializing the type `T` and set the proper [`Encoding`]. - fn serialize(self, t: T) -> Self::Output; +impl std::io::Read for PayloadReader<'_> { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + std::io::Read::read(&mut self.0, buf) + } } -pub trait Deserialize<'a, T> { - type Error; +impl std::io::Seek for PayloadReader<'_> { + fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result { + std::io::Seek::seek(&mut self.0, pos) + } +} - /// The implementer should take care of deserializing the type `T` based on the [`Encoding`] information. - fn deserialize(self, t: &'a Payload) -> Result; +/// A writer that implements [`std::io::Write`] trait to write into a [`Payload`]. +#[repr(transparent)] +#[derive(Debug)] +pub struct PayloadWriter<'a>(ZBufWriter<'a>); + +impl std::io::Write for PayloadWriter<'_> { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + std::io::Write::write(&mut self.0, buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +/// An iterator that implements [`std::iter::Iterator`] trait to iterate on values `T` in a [`Payload`]. +/// Note that [`Payload`] contains a serialized version of `T` and iterating over a [`Payload`] performs lazy deserialization. +#[repr(transparent)] +#[derive(Debug)] +pub struct PayloadIterator<'a, T> +where + ZSerde: Deserialize<'a, T>, +{ + reader: ZBufReader<'a>, + _t: PhantomData, +} + +impl Iterator for PayloadIterator<'_, T> +where + for<'a> ZSerde: Deserialize<'a, T>, + for<'a> >::Error: Debug, +{ + type Item = T; + + fn next(&mut self) -> Option { + let codec = Zenoh080::new(); + + let kbuf: ZBuf = codec.read(&mut self.reader).ok()?; + let kpld = Payload::new(kbuf); + + let t = ZSerde.deserialize(&kpld).ok()?; + Some(t) + } +} + +impl FromIterator for Payload +where + ZSerde: Serialize, +{ + fn from_iter>(iter: T) -> Self { + let codec = Zenoh080::new(); + let mut buffer: ZBuf = ZBuf::empty(); + let mut writer = buffer.writer(); + for t in iter { + let tpld = ZSerde.serialize(t); + // SAFETY: we are serializing slices on a ZBuf, so serialization will never + // fail unless we run out of memory. In that case, Rust memory allocator + // will panic before the serializer has any chance to fail. + unsafe { + codec.write(&mut writer, &tpld.0).unwrap_unchecked(); + } + } + + Payload::new(buffer) + } } /// The default serializer for Zenoh payload. It supports primitives types, such as: vec, int, uint, float, string, bool. @@ -122,7 +239,7 @@ pub struct ZSerde; #[derive(Debug, Clone, Copy)] pub struct ZDeserializeError; -// Bytes +// ZBuf impl Serialize for ZSerde { type Output = Payload; @@ -131,9 +248,23 @@ impl Serialize for ZSerde { } } -impl From for ZBuf { - fn from(value: Payload) -> Self { - value.0 +impl From for Payload { + fn from(t: ZBuf) -> Self { + ZSerde.serialize(t) + } +} + +impl Serialize<&ZBuf> for ZSerde { + type Output = Payload; + + fn serialize(self, t: &ZBuf) -> Self::Output { + Payload::new(t.clone()) + } +} + +impl From<&ZBuf> for Payload { + fn from(t: &ZBuf) -> Self { + ZSerde.serialize(t) } } @@ -141,16 +272,133 @@ impl Deserialize<'_, ZBuf> for ZSerde { type Error = Infallible; fn deserialize(self, v: &Payload) -> Result { - Ok(v.into()) + Ok(v.0.clone()) + } +} + +impl From for ZBuf { + fn from(value: Payload) -> Self { + value.0 } } impl From<&Payload> for ZBuf { fn from(value: &Payload) -> Self { - value.0.clone() + ZSerde.deserialize(value).unwrap_infallible() + } +} + +// ZSlice +impl Serialize for ZSerde { + type Output = Payload; + + fn serialize(self, t: ZSlice) -> Self::Output { + Payload::new(t) + } +} + +impl From for Payload { + fn from(t: ZSlice) -> Self { + ZSerde.serialize(t) } } +impl Serialize<&ZSlice> for ZSerde { + type Output = Payload; + + fn serialize(self, t: &ZSlice) -> Self::Output { + Payload::new(t.clone()) + } +} + +impl From<&ZSlice> for Payload { + fn from(t: &ZSlice) -> Self { + ZSerde.serialize(t) + } +} + +impl Deserialize<'_, ZSlice> for ZSerde { + type Error = Infallible; + + fn deserialize(self, v: &Payload) -> Result { + Ok(v.0.to_zslice()) + } +} + +impl From for ZSlice { + fn from(value: Payload) -> Self { + ZBuf::from(value).to_zslice() + } +} + +impl From<&Payload> for ZSlice { + fn from(value: &Payload) -> Self { + ZSerde.deserialize(value).unwrap_infallible() + } +} + +// [u8; N] +impl Serialize<[u8; N]> for ZSerde { + type Output = Payload; + + fn serialize(self, t: [u8; N]) -> Self::Output { + Payload::new(t) + } +} + +impl From<[u8; N]> for Payload { + fn from(t: [u8; N]) -> Self { + ZSerde.serialize(t) + } +} + +impl Serialize<&[u8; N]> for ZSerde { + type Output = Payload; + + fn serialize(self, t: &[u8; N]) -> Self::Output { + Payload::new(*t) + } +} + +impl From<&[u8; N]> for Payload { + fn from(t: &[u8; N]) -> Self { + ZSerde.serialize(t) + } +} + +impl Deserialize<'_, [u8; N]> for ZSerde { + type Error = ZDeserializeError; + + fn deserialize(self, v: &Payload) -> Result<[u8; N], Self::Error> { + use std::io::Read; + + if v.0.len() != N { + return Err(ZDeserializeError); + } + let mut dst = [0u8; N]; + let mut reader = v.reader(); + reader.read_exact(&mut dst).map_err(|_| ZDeserializeError)?; + Ok(dst) + } +} + +impl TryFrom for [u8; N] { + type Error = ZDeserializeError; + + fn try_from(value: Payload) -> Result { + ZSerde.deserialize(&value) + } +} + +impl TryFrom<&Payload> for [u8; N] { + type Error = ZDeserializeError; + + fn try_from(value: &Payload) -> Result { + ZSerde.deserialize(value) + } +} + +// Vec impl Serialize> for ZSerde { type Output = Payload; @@ -159,11 +407,23 @@ impl Serialize> for ZSerde { } } -impl Serialize<&[u8]> for ZSerde { +impl From> for Payload { + fn from(t: Vec) -> Self { + ZSerde.serialize(t) + } +} + +impl Serialize<&Vec> for ZSerde { type Output = Payload; - fn serialize(self, t: &[u8]) -> Self::Output { - Payload::new(t.to_vec()) + fn serialize(self, t: &Vec) -> Self::Output { + Payload::new(t.clone()) + } +} + +impl From<&Vec> for Payload { + fn from(t: &Vec) -> Self { + ZSerde.serialize(t) } } @@ -171,16 +431,38 @@ impl Deserialize<'_, Vec> for ZSerde { type Error = Infallible; fn deserialize(self, v: &Payload) -> Result, Self::Error> { - Ok(Vec::from(v)) + Ok(v.0.contiguous().to_vec()) + } +} + +impl From for Vec { + fn from(value: Payload) -> Self { + ZSerde.deserialize(&value).unwrap_infallible() } } impl From<&Payload> for Vec { fn from(value: &Payload) -> Self { - Cow::from(value).to_vec() + ZSerde.deserialize(value).unwrap_infallible() + } +} + +// &[u8] +impl Serialize<&[u8]> for ZSerde { + type Output = Payload; + + fn serialize(self, t: &[u8]) -> Self::Output { + Payload::new(t.to_vec()) + } +} + +impl From<&[u8]> for Payload { + fn from(t: &[u8]) -> Self { + ZSerde.serialize(t) } } +// Cow<[u8]> impl<'a> Serialize> for ZSerde { type Output = Payload; @@ -189,17 +471,37 @@ impl<'a> Serialize> for ZSerde { } } +impl From> for Payload { + fn from(t: Cow<'_, [u8]>) -> Self { + ZSerde.serialize(t) + } +} + +impl<'a> Serialize<&Cow<'a, [u8]>> for ZSerde { + type Output = Payload; + + fn serialize(self, t: &Cow<'a, [u8]>) -> Self::Output { + Payload::new(t.to_vec()) + } +} + +impl From<&Cow<'_, [u8]>> for Payload { + fn from(t: &Cow<'_, [u8]>) -> Self { + ZSerde.serialize(t) + } +} + impl<'a> Deserialize<'a, Cow<'a, [u8]>> for ZSerde { type Error = Infallible; fn deserialize(self, v: &'a Payload) -> Result, Self::Error> { - Ok(Cow::from(v)) + Ok(v.0.contiguous()) } } impl<'a> From<&'a Payload> for Cow<'a, [u8]> { fn from(value: &'a Payload) -> Self { - value.0.contiguous() + ZSerde.deserialize(value).unwrap_infallible() } } @@ -212,11 +514,23 @@ impl Serialize for ZSerde { } } -impl Serialize<&str> for ZSerde { +impl From for Payload { + fn from(t: String) -> Self { + ZSerde.serialize(t) + } +} + +impl Serialize<&String> for ZSerde { type Output = Payload; - fn serialize(self, s: &str) -> Self::Output { - Self.serialize(s.to_string()) + fn serialize(self, s: &String) -> Self::Output { + Payload::new(s.clone().into_bytes()) + } +} + +impl From<&String> for Payload { + fn from(t: &String) -> Self { + ZSerde.serialize(t) } } @@ -224,7 +538,16 @@ impl Deserialize<'_, String> for ZSerde { type Error = FromUtf8Error; fn deserialize(self, v: &Payload) -> Result { - String::from_utf8(Vec::from(v)) + let v: Vec = ZSerde.deserialize(v).unwrap_infallible(); + String::from_utf8(v) + } +} + +impl TryFrom for String { + type Error = FromUtf8Error; + + fn try_from(value: Payload) -> Result { + ZSerde.deserialize(&value) } } @@ -236,11 +559,18 @@ impl TryFrom<&Payload> for String { } } -impl TryFrom for String { - type Error = FromUtf8Error; +// &str +impl Serialize<&str> for ZSerde { + type Output = Payload; - fn try_from(value: Payload) -> Result { - ZSerde.deserialize(&value) + fn serialize(self, s: &str) -> Self::Output { + Self.serialize(s.to_string()) + } +} + +impl From<&str> for Payload { + fn from(t: &str) -> Self { + ZSerde.serialize(t) } } @@ -252,17 +582,40 @@ impl<'a> Serialize> for ZSerde { } } -impl<'a> Deserialize<'a, Cow<'a, str>> for ZSerde { - type Error = FromUtf8Error; +impl From> for Payload { + fn from(t: Cow<'_, str>) -> Self { + ZSerde.serialize(t) + } +} + +impl<'a> Serialize<&Cow<'a, str>> for ZSerde { + type Output = Payload; + + fn serialize(self, s: &Cow<'a, str>) -> Self::Output { + Self.serialize(s.to_string()) + } +} + +impl From<&Cow<'_, str>> for Payload { + fn from(t: &Cow<'_, str>) -> Self { + ZSerde.serialize(t) + } +} - fn deserialize(self, v: &Payload) -> Result, Self::Error> { - let v: String = Self.deserialize(v)?; - Ok(Cow::Owned(v)) +impl<'a> Deserialize<'a, Cow<'a, str>> for ZSerde { + type Error = Utf8Error; + + fn deserialize(self, v: &'a Payload) -> Result, Self::Error> { + let v: Cow<[u8]> = Self.deserialize(v).unwrap_infallible(); + let _ = core::str::from_utf8(v.as_ref())?; + // SAFETY: &str is &[u8] with the guarantee that every char is UTF-8 + // As implemented internally https://doc.rust-lang.org/std/str/fn.from_utf8_unchecked.html. + Ok(unsafe { core::mem::transmute(v) }) } } impl<'a> TryFrom<&'a Payload> for Cow<'a, str> { - type Error = FromUtf8Error; + type Error = Utf8Error; fn try_from(value: &'a Payload) -> Result { ZSerde.deserialize(value) @@ -271,13 +624,17 @@ impl<'a> TryFrom<&'a Payload> for Cow<'a, str> { // - Integers impl macro_rules! impl_int { - ($t:ty, $encoding:expr) => { + ($t:ty) => { impl Serialize<$t> for ZSerde { type Output = Payload; fn serialize(self, t: $t) -> Self::Output { let bs = t.to_le_bytes(); - let end = 1 + bs.iter().rposition(|b| *b != 0).unwrap_or(bs.len() - 1); + let end = if t == 0 as $t { + 0 + } else { + 1 + bs.iter().rposition(|b| *b != 0).unwrap_or(bs.len() - 1) + }; // SAFETY: // - 0 is a valid start index because bs is guaranteed to always have a length greater or equal than 1 // - end is a valid end index because is bounded between 0 and bs.len() @@ -285,6 +642,12 @@ macro_rules! impl_int { } } + impl From<$t> for Payload { + fn from(t: $t) -> Self { + ZSerde.serialize(t) + } + } + impl Serialize<&$t> for ZSerde { type Output = Payload; @@ -293,11 +656,9 @@ macro_rules! impl_int { } } - impl Serialize<&mut $t> for ZSerde { - type Output = Payload; - - fn serialize(self, t: &mut $t) -> Self::Output { - Self.serialize(*t) + impl From<&$t> for Payload { + fn from(t: &$t) -> Self { + ZSerde.serialize(t) } } @@ -319,6 +680,14 @@ macro_rules! impl_int { } } + impl TryFrom for $t { + type Error = ZDeserializeError; + + fn try_from(value: Payload) -> Result { + ZSerde.deserialize(&value) + } + } + impl TryFrom<&Payload> for $t { type Error = ZDeserializeError; @@ -330,31 +699,51 @@ macro_rules! impl_int { } // Zenoh unsigned integers -impl_int!(u8, ZSerde::ZENOH_UINT); -impl_int!(u16, ZSerde::ZENOH_UINT); -impl_int!(u32, ZSerde::ZENOH_UINT); -impl_int!(u64, ZSerde::ZENOH_UINT); -impl_int!(usize, ZSerde::ZENOH_UINT); +impl_int!(u8); +impl_int!(u16); +impl_int!(u32); +impl_int!(u64); +impl_int!(usize); // Zenoh signed integers -impl_int!(i8, ZSerde::ZENOH_INT); -impl_int!(i16, ZSerde::ZENOH_INT); -impl_int!(i32, ZSerde::ZENOH_INT); -impl_int!(i64, ZSerde::ZENOH_INT); -impl_int!(isize, ZSerde::ZENOH_INT); +impl_int!(i8); +impl_int!(i16); +impl_int!(i32); +impl_int!(i64); +impl_int!(isize); // Zenoh floats -impl_int!(f32, ZSerde::ZENOH_FLOAT); -impl_int!(f64, ZSerde::ZENOH_FLOAT); +impl_int!(f32); +impl_int!(f64); // Zenoh bool impl Serialize for ZSerde { - type Output = ZBuf; + type Output = Payload; fn serialize(self, t: bool) -> Self::Output { // SAFETY: casting a bool into an integer is well-defined behaviour. // 0 is false, 1 is true: https://doc.rust-lang.org/std/primitive.bool.html - ZBuf::from((t as u8).to_le_bytes()) + Payload::new(ZBuf::from((t as u8).to_le_bytes())) + } +} + +impl From for Payload { + fn from(t: bool) -> Self { + ZSerde.serialize(t) + } +} + +impl Serialize<&bool> for ZSerde { + type Output = Payload; + + fn serialize(self, t: &bool) -> Self::Output { + ZSerde.serialize(*t) + } +} + +impl From<&bool> for Payload { + fn from(t: &bool) -> Self { + ZSerde.serialize(t) } } @@ -371,6 +760,14 @@ impl Deserialize<'_, bool> for ZSerde { } } +impl TryFrom for bool { + type Error = ZDeserializeError; + + fn try_from(value: Payload) -> Result { + ZSerde.deserialize(&value) + } +} + impl TryFrom<&Payload> for bool { type Error = ZDeserializeError; @@ -381,21 +778,37 @@ impl TryFrom<&Payload> for bool { // - Zenoh advanced types encoders/decoders // JSON +impl Serialize for ZSerde { + type Output = Result; + + fn serialize(self, t: serde_json::Value) -> Self::Output { + ZSerde.serialize(&t) + } +} + +impl TryFrom for Payload { + type Error = serde_json::Error; + + fn try_from(value: serde_json::Value) -> Result { + ZSerde.serialize(&value) + } +} + impl Serialize<&serde_json::Value> for ZSerde { type Output = Result; fn serialize(self, t: &serde_json::Value) -> Self::Output { let mut payload = Payload::empty(); - serde_json::to_writer(payload.0.writer(), t)?; + serde_json::to_writer(payload.writer(), t)?; Ok(payload) } } -impl Serialize for ZSerde { - type Output = Result; +impl TryFrom<&serde_json::Value> for Payload { + type Error = serde_json::Error; - fn serialize(self, t: serde_json::Value) -> Self::Output { - Self.serialize(&t) + fn try_from(value: &serde_json::Value) -> Result { + ZSerde.serialize(value) } } @@ -407,15 +820,39 @@ impl Deserialize<'_, serde_json::Value> for ZSerde { } } -impl TryFrom for Payload { +impl TryFrom for serde_json::Value { type Error = serde_json::Error; - fn try_from(value: serde_json::Value) -> Result { - ZSerde.serialize(value) + fn try_from(value: Payload) -> Result { + ZSerde.deserialize(&value) + } +} + +impl TryFrom<&Payload> for serde_json::Value { + type Error = serde_json::Error; + + fn try_from(value: &Payload) -> Result { + ZSerde.deserialize(value) } } // Yaml +impl Serialize for ZSerde { + type Output = Result; + + fn serialize(self, t: serde_yaml::Value) -> Self::Output { + Self.serialize(&t) + } +} + +impl TryFrom for Payload { + type Error = serde_yaml::Error; + + fn try_from(value: serde_yaml::Value) -> Result { + ZSerde.serialize(value) + } +} + impl Serialize<&serde_yaml::Value> for ZSerde { type Output = Result; @@ -426,11 +863,11 @@ impl Serialize<&serde_yaml::Value> for ZSerde { } } -impl Serialize for ZSerde { - type Output = Result; +impl TryFrom<&serde_yaml::Value> for Payload { + type Error = serde_yaml::Error; - fn serialize(self, t: serde_yaml::Value) -> Self::Output { - Self.serialize(&t) + fn try_from(value: &serde_yaml::Value) -> Result { + ZSerde.serialize(value) } } @@ -442,15 +879,39 @@ impl Deserialize<'_, serde_yaml::Value> for ZSerde { } } -impl TryFrom for Payload { +impl TryFrom for serde_yaml::Value { type Error = serde_yaml::Error; - fn try_from(value: serde_yaml::Value) -> Result { - ZSerde.serialize(value) + fn try_from(value: Payload) -> Result { + ZSerde.deserialize(&value) + } +} + +impl TryFrom<&Payload> for serde_yaml::Value { + type Error = serde_yaml::Error; + + fn try_from(value: &Payload) -> Result { + ZSerde.deserialize(value) } } // CBOR +impl Serialize for ZSerde { + type Output = Result; + + fn serialize(self, t: serde_cbor::Value) -> Self::Output { + Self.serialize(&t) + } +} + +impl TryFrom for Payload { + type Error = serde_cbor::Error; + + fn try_from(value: serde_cbor::Value) -> Result { + ZSerde.serialize(value) + } +} + impl Serialize<&serde_cbor::Value> for ZSerde { type Output = Result; @@ -461,11 +922,11 @@ impl Serialize<&serde_cbor::Value> for ZSerde { } } -impl Serialize for ZSerde { - type Output = Result; +impl TryFrom<&serde_cbor::Value> for Payload { + type Error = serde_cbor::Error; - fn serialize(self, t: serde_cbor::Value) -> Self::Output { - Self.serialize(&t) + fn try_from(value: &serde_cbor::Value) -> Result { + ZSerde.serialize(value) } } @@ -477,15 +938,39 @@ impl Deserialize<'_, serde_cbor::Value> for ZSerde { } } -impl TryFrom for Payload { +impl TryFrom for serde_cbor::Value { type Error = serde_cbor::Error; - fn try_from(value: serde_cbor::Value) -> Result { - ZSerde.serialize(value) + fn try_from(value: Payload) -> Result { + ZSerde.deserialize(&value) + } +} + +impl TryFrom<&Payload> for serde_cbor::Value { + type Error = serde_cbor::Error; + + fn try_from(value: &Payload) -> Result { + ZSerde.deserialize(value) } } // Pickle +impl Serialize for ZSerde { + type Output = Result; + + fn serialize(self, t: serde_pickle::Value) -> Self::Output { + Self.serialize(&t) + } +} + +impl TryFrom for Payload { + type Error = serde_pickle::Error; + + fn try_from(value: serde_pickle::Value) -> Result { + ZSerde.serialize(value) + } +} + impl Serialize<&serde_pickle::Value> for ZSerde { type Output = Result; @@ -500,11 +985,11 @@ impl Serialize<&serde_pickle::Value> for ZSerde { } } -impl Serialize for ZSerde { - type Output = Result; +impl TryFrom<&serde_pickle::Value> for Payload { + type Error = serde_pickle::Error; - fn serialize(self, t: serde_pickle::Value) -> Self::Output { - Self.serialize(&t) + fn try_from(value: &serde_pickle::Value) -> Result { + ZSerde.serialize(value) } } @@ -516,11 +1001,19 @@ impl Deserialize<'_, serde_pickle::Value> for ZSerde { } } -impl TryFrom for Payload { +impl TryFrom for serde_pickle::Value { type Error = serde_pickle::Error; - fn try_from(value: serde_pickle::Value) -> Result { - ZSerde.serialize(value) + fn try_from(value: Payload) -> Result { + ZSerde.deserialize(&value) + } +} + +impl TryFrom<&Payload> for serde_pickle::Value { + type Error = serde_pickle::Error; + + fn try_from(value: &Payload) -> Result { + ZSerde.deserialize(value) } } @@ -533,6 +1026,12 @@ impl Serialize> for ZSerde { Payload::new(t) } } +#[cfg(feature = "shared-memory")] +impl From> for Payload { + fn from(t: Arc) -> Self { + ZSerde.serialize(t) + } +} #[cfg(feature = "shared-memory")] impl Serialize> for ZSerde { @@ -544,6 +1043,13 @@ impl Serialize> for ZSerde { } } +#[cfg(feature = "shared-memory")] +impl From> for Payload { + fn from(t: Box) -> Self { + ZSerde.serialize(t) + } +} + #[cfg(feature = "shared-memory")] impl Serialize for ZSerde { type Output = Payload; @@ -553,16 +1059,131 @@ impl Serialize for ZSerde { } } -impl From for Payload +#[cfg(feature = "shared-memory")] +impl From for Payload { + fn from(t: SharedMemoryBuf) -> Self { + ZSerde.serialize(t) + } +} + +#[cfg(feature = "shared-memory")] +impl Deserialize<'_, SharedMemoryBuf> for ZSerde { + type Error = ZDeserializeError; + + fn deserialize(self, v: &Payload) -> Result { + // A SharedMemoryBuf is expected to have only one slice + let mut zslices = v.0.zslices(); + if let Some(zs) = zslices.next() { + if let Some(shmb) = zs.downcast_ref::() { + return Ok(shmb.clone()); + } + } + Err(ZDeserializeError) + } +} + +#[cfg(feature = "shared-memory")] +impl TryFrom for SharedMemoryBuf { + type Error = ZDeserializeError; + + fn try_from(value: Payload) -> Result { + ZSerde.deserialize(&value) + } +} + +// Tuple +impl Serialize<(A, B)> for ZSerde where - ZSerde: Serialize, + A: Into, + B: Into, { - fn from(t: T) -> Self { - ZSerde.serialize(t) + type Output = Payload; + + fn serialize(self, t: (A, B)) -> Self::Output { + let (a, b) = t; + + let codec = Zenoh080::new(); + let mut buffer: ZBuf = ZBuf::empty(); + let mut writer = buffer.writer(); + let apld: Payload = a.into(); + let bpld: Payload = b.into(); + + // SAFETY: we are serializing slices on a ZBuf, so serialization will never + // fail unless we run out of memory. In that case, Rust memory allocator + // will panic before the serializer has any chance to fail. + unsafe { + codec.write(&mut writer, &apld.0).unwrap_unchecked(); + codec.write(&mut writer, &bpld.0).unwrap_unchecked(); + } + + Payload::new(buffer) + } +} + +impl From<(A, B)> for Payload +where + A: Into, + B: Into, +{ + fn from(value: (A, B)) -> Self { + ZSerde.serialize(value) } } -// For convenience to always convert a Value the examples +impl Deserialize<'_, (A, B)> for ZSerde +where + for<'a> A: TryFrom<&'a Payload>, + for<'a> >::Error: Debug, + for<'b> B: TryFrom<&'b Payload>, + for<'b> >::Error: Debug, +{ + type Error = ZError; + + fn deserialize(self, payload: &Payload) -> Result<(A, B), Self::Error> { + let codec = Zenoh080::new(); + let mut reader = payload.0.reader(); + + let abuf: ZBuf = codec.read(&mut reader).map_err(|e| zerror!("{:?}", e))?; + let apld = Payload::new(abuf); + + let bbuf: ZBuf = codec.read(&mut reader).map_err(|e| zerror!("{:?}", e))?; + let bpld = Payload::new(bbuf); + + let a = A::try_from(&apld).map_err(|e| zerror!("{:?}", e))?; + let b = B::try_from(&bpld).map_err(|e| zerror!("{:?}", e))?; + Ok((a, b)) + } +} + +impl TryFrom for (A, B) +where + A: for<'a> TryFrom<&'a Payload>, + for<'a> >::Error: Debug, + for<'b> B: TryFrom<&'b Payload>, + for<'b> >::Error: Debug, +{ + type Error = ZError; + + fn try_from(value: Payload) -> Result { + ZSerde.deserialize(&value) + } +} + +impl TryFrom<&Payload> for (A, B) +where + for<'a> A: TryFrom<&'a Payload>, + for<'a> >::Error: Debug, + for<'b> B: TryFrom<&'b Payload>, + for<'b> >::Error: Debug, +{ + type Error = ZError; + + fn try_from(value: &Payload) -> Result { + ZSerde.deserialize(value) + } +} + +// For convenience to always convert a Value in the examples #[derive(Debug, Clone, PartialEq, Eq)] pub enum StringOrBase64 { String(String), @@ -596,12 +1217,9 @@ impl std::fmt::Display for StringOrBase64 { impl From<&Payload> for StringOrBase64 { fn from(v: &Payload) -> Self { use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine}; - match v.deserialize::>() { - Ok(s) => StringOrBase64::String(s.into_owned()), - Err(_) => { - let cow: Cow<'_, [u8]> = Cow::from(v); - StringOrBase64::Base64(b64_std_engine.encode(cow)) - } + match v.deserialize::() { + Ok(s) => StringOrBase64::String(s), + Err(_) => StringOrBase64::Base64(b64_std_engine.encode(v.into::>())), } } } @@ -611,7 +1229,8 @@ mod tests { fn serializer() { use super::Payload; use rand::Rng; - use zenoh_buffers::ZBuf; + use std::borrow::Cow; + use zenoh_buffers::{ZBuf, ZSlice}; const NUM: usize = 1_000; @@ -619,14 +1238,18 @@ mod tests { ($t:ty, $in:expr) => { let i = $in; let t = i.clone(); + println!("Serialize:\t{:?}", t); let v = Payload::serialize(t); + println!("Deserialize:\t{:?}", v); let o: $t = v.deserialize().unwrap(); - assert_eq!(i, o) + assert_eq!(i, o); + println!(""); }; } let mut rng = rand::thread_rng(); + // unsigned integer serialize_deserialize!(u8, u8::MIN); serialize_deserialize!(u16, u16::MIN); serialize_deserialize!(u32, u32::MIN); @@ -647,6 +1270,7 @@ mod tests { serialize_deserialize!(usize, rng.gen::()); } + // signed integer serialize_deserialize!(i8, i8::MIN); serialize_deserialize!(i16, i16::MIN); serialize_deserialize!(i32, i32::MIN); @@ -667,6 +1291,7 @@ mod tests { serialize_deserialize!(isize, rng.gen::()); } + // float serialize_deserialize!(f32, f32::MIN); serialize_deserialize!(f64, f64::MIN); @@ -678,13 +1303,104 @@ mod tests { serialize_deserialize!(f64, rng.gen::()); } + // String serialize_deserialize!(String, ""); - serialize_deserialize!(String, String::from("abcdefghijklmnopqrstuvwxyz")); + serialize_deserialize!(String, String::from("abcdef")); + // Cow + serialize_deserialize!(Cow, Cow::from("")); + serialize_deserialize!(Cow, Cow::from(String::from("abcdef"))); + + // Vec serialize_deserialize!(Vec, vec![0u8; 0]); serialize_deserialize!(Vec, vec![0u8; 64]); + // Cow<[u8]> + serialize_deserialize!(Cow<[u8]>, Cow::from(vec![0u8; 0])); + serialize_deserialize!(Cow<[u8]>, Cow::from(vec![0u8; 64])); + + // ZBuf serialize_deserialize!(ZBuf, ZBuf::from(vec![0u8; 0])); serialize_deserialize!(ZBuf, ZBuf::from(vec![0u8; 64])); + + // Tuple + serialize_deserialize!((usize, usize), (0, 1)); + serialize_deserialize!((usize, String), (0, String::from("a"))); + serialize_deserialize!((String, String), (String::from("a"), String::from("b"))); + + // Iterator + let v: [usize; 5] = [0, 1, 2, 3, 4]; + println!("Serialize:\t{:?}", v); + let p = Payload::from_iter(v.iter()); + println!("Deserialize:\t{:?}\n", p); + for (i, t) in p.iter::().enumerate() { + assert_eq!(i, t); + } + + let mut v = vec![[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]; + println!("Serialize:\t{:?}", v); + let p = Payload::from_iter(v.drain(..)); + println!("Deserialize:\t{:?}\n", p); + let mut iter = p.iter::<[u8; 4]>(); + assert_eq!(iter.next().unwrap(), [0, 1, 2, 3]); + assert_eq!(iter.next().unwrap(), [4, 5, 6, 7]); + assert_eq!(iter.next().unwrap(), [8, 9, 10, 11]); + assert_eq!(iter.next().unwrap(), [12, 13, 14, 15]); + assert!(iter.next().is_none()); + + use std::collections::HashMap; + let mut hm: HashMap = HashMap::new(); + hm.insert(0, 0); + hm.insert(1, 1); + println!("Serialize:\t{:?}", hm); + let p = Payload::from_iter(hm.clone().drain()); + println!("Deserialize:\t{:?}\n", p); + let o = HashMap::from_iter(p.iter::<(usize, usize)>()); + assert_eq!(hm, o); + + let mut hm: HashMap> = HashMap::new(); + hm.insert(0, vec![0u8; 8]); + hm.insert(1, vec![1u8; 16]); + println!("Serialize:\t{:?}", hm); + let p = Payload::from_iter(hm.clone().drain()); + println!("Deserialize:\t{:?}\n", p); + let o = HashMap::from_iter(p.iter::<(usize, Vec)>()); + assert_eq!(hm, o); + + let mut hm: HashMap> = HashMap::new(); + hm.insert(0, vec![0u8; 8]); + hm.insert(1, vec![1u8; 16]); + println!("Serialize:\t{:?}", hm); + let p = Payload::from_iter(hm.clone().drain()); + println!("Deserialize:\t{:?}\n", p); + let o = HashMap::from_iter(p.iter::<(usize, Vec)>()); + assert_eq!(hm, o); + + let mut hm: HashMap = HashMap::new(); + hm.insert(0, ZSlice::from(vec![0u8; 8])); + hm.insert(1, ZSlice::from(vec![1u8; 16])); + println!("Serialize:\t{:?}", hm); + let p = Payload::from_iter(hm.clone().drain()); + println!("Deserialize:\t{:?}\n", p); + let o = HashMap::from_iter(p.iter::<(usize, ZSlice)>()); + assert_eq!(hm, o); + + let mut hm: HashMap = HashMap::new(); + hm.insert(0, ZBuf::from(vec![0u8; 8])); + hm.insert(1, ZBuf::from(vec![1u8; 16])); + println!("Serialize:\t{:?}", hm); + let p = Payload::from_iter(hm.clone().drain()); + println!("Deserialize:\t{:?}\n", p); + let o = HashMap::from_iter(p.iter::<(usize, ZBuf)>()); + assert_eq!(hm, o); + + let mut hm: HashMap> = HashMap::new(); + hm.insert(0, vec![0u8; 8]); + hm.insert(1, vec![1u8; 16]); + println!("Serialize:\t{:?}", hm); + let p = Payload::from_iter(hm.clone().iter().map(|(k, v)| (k, Cow::from(v)))); + println!("Deserialize:\t{:?}\n", p); + let o = HashMap::from_iter(p.iter::<(usize, Vec)>()); + assert_eq!(hm, o); } } diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 181976dcb0..375411d663 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -60,14 +60,7 @@ use zenoh_collections::SingleOrVec; use zenoh_config::unwrap_or_default; use zenoh_core::{zconfigurable, zread, Resolve, ResolveClosure, ResolveFuture, SyncResolve}; #[cfg(feature = "unstable")] -use zenoh_protocol::network::declare::SubscriberId; -#[cfg(feature = "unstable")] -use zenoh_protocol::network::ext; -use zenoh_protocol::network::AtomicRequestId; -use zenoh_protocol::network::RequestId; -use zenoh_protocol::zenoh::reply::ReplyBody; -use zenoh_protocol::zenoh::Del; -use zenoh_protocol::zenoh::Put; +use zenoh_protocol::network::{declare::SubscriberId, ext}; use zenoh_protocol::{ core::{ key_expr::{keyexpr, OwnedKeyExpr}, @@ -80,11 +73,12 @@ use zenoh_protocol::{ DeclareQueryable, DeclareSubscriber, UndeclareQueryable, UndeclareSubscriber, }, request::{self, ext::TargetType, Request}, - Mapping, Push, Response, ResponseFinal, + AtomicRequestId, Mapping, Push, RequestId, Response, ResponseFinal, }, zenoh::{ query::{self, ext::QueryBodyType, Consolidation}, - PushBody, RequestBody, ResponseBody, + reply::ReplyBody, + Del, PushBody, Put, RequestBody, ResponseBody, }, }; use zenoh_result::ZResult;