From 2d93268341d3d3e626ef669f9e54eb7e322a63b8 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Wed, 21 Feb 2024 15:28:14 +0100 Subject: [PATCH] Provide default encoder --- examples/examples/z_get.rs | 3 +- examples/examples/z_get_liveliness.rs | 3 +- .../zenoh-plugin-rest/examples/z_serve_sse.rs | 6 +- plugins/zenoh-plugin-rest/src/lib.rs | 17 +- .../src/replica/storage.rs | 3 +- zenoh/src/admin.rs | 6 +- zenoh/src/encoding.rs | 451 +++++++----- zenoh/src/net/runtime/adminspace.rs | 32 +- zenoh/src/prelude.rs | 5 +- zenoh/src/publication.rs | 2 +- zenoh/src/queryable.rs | 4 +- zenoh/src/session.rs | 17 +- zenoh/src/value.rs | 650 +----------------- 13 files changed, 341 insertions(+), 858 deletions(-) diff --git a/examples/examples/z_get.rs b/examples/examples/z_get.rs index 57c36c2e62..08e76f350d 100644 --- a/examples/examples/z_get.rs +++ b/examples/examples/z_get.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use clap::Parser; -use std::convert::TryFrom; use std::time::Duration; use zenoh::config::Config; use zenoh::prelude::r#async::*; @@ -45,7 +44,7 @@ async fn main() { sample.key_expr.as_str(), sample.value, ), - Err(err) => println!(">> Received (ERROR: '{}')", String::try_from(&err).unwrap()), + Err(err) => println!(">> Received (ERROR: '{}')", err.decode::().unwrap()), } } } diff --git a/examples/examples/z_get_liveliness.rs b/examples/examples/z_get_liveliness.rs index e0aaf8cd23..25904a50d6 100644 --- a/examples/examples/z_get_liveliness.rs +++ b/examples/examples/z_get_liveliness.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use clap::Parser; -use std::convert::TryFrom; use std::time::Duration; use zenoh::config::Config; use zenoh::prelude::r#async::*; @@ -39,7 +38,7 @@ async fn main() { while let Ok(reply) = replies.recv_async().await { match reply.sample { Ok(sample) => println!(">> Alive token ('{}')", sample.key_expr.as_str(),), - Err(err) => println!(">> Received (ERROR: '{}')", String::try_from(&err).unwrap()), + Err(err) => println!(">> Received (ERROR: '{}')", err.decode::().unwrap()), } } } diff --git a/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs b/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs index aa64ca6bd1..3a55c532da 100644 --- a/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs +++ b/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs @@ -75,11 +75,7 @@ async fn main() { println!("Data updates are accessible through HTML5 SSE at http://:8000/{key}"); loop { - publisher - .put(Value::from(value).encoding(Encoding::TEXT_PLAIN)) - .res() - .await - .unwrap(); + publisher.put(Value::from(value)).res().await.unwrap(); async_std::task::sleep(Duration::from_secs(1)).await; } } diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index 0edcbf4cb7..94f1b64ca4 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -27,6 +27,7 @@ use std::sync::Arc; use tide::http::Mime; use tide::sse::Sender; use tide::{Request, Response, Server, StatusCode}; +use zenoh::encoding::EncodingMapping; use zenoh::plugins::{RunningPluginTrait, ZenohPlugin}; use zenoh::prelude::r#async::*; use zenoh::properties::Properties; @@ -49,19 +50,19 @@ const RAW_KEY: &str = "_raw"; fn value_to_json(value: Value) -> String { // @TODO: transcode to JSON when implemented in Value match &value.encoding { - p if p.starts_with(Encoding::TEXT_PLAIN) - || p.starts_with(Encoding::APP_XWWW_FORM_URLENCODED) => + p if p.starts_with(DefaultEncoder::TEXT_PLAIN) + || p.starts_with(DefaultEncoder::APP_XWWW_FORM_URLENCODED) => { // convert to Json string for special characters escaping serde_json::json!(value.to_string()).to_string() } - p if p.starts_with(Encoding::APP_PROPERTIES) => { + p if p.starts_with(DefaultEncoder::APP_PROPERTIES) => { // convert to Json string for special characters escaping serde_json::json!(*Properties::from(value.to_string())).to_string() } - p if p.starts_with(Encoding::APP_JSON) - || p.starts_with(Encoding::APP_INTEGER) - || p.starts_with(Encoding::APP_FLOAT) => + p if p.starts_with(DefaultEncoder::APP_JSON) + || p.starts_with(DefaultEncoder::APP_INTEGER) + || p.starts_with(DefaultEncoder::APP_FLOAT) => { value.to_string() } @@ -403,7 +404,7 @@ async fn query(mut req: Request<(Arc, String)>) -> tide::Result match Encoding::try_from(m.to_string()) { + Some(m) => match DefaultEncoder::parse(m.to_string().as_str()) { Ok(e) => e, Err(e) => { return Ok(response( @@ -452,7 +453,7 @@ async fn write(mut req: Request<(Arc, String)>) -> tide::Result match Encoding::try_from(m.to_string()) { + Some(m) => match DefaultEncoder::parse(m.to_string().as_str()) { Ok(e) => e, Err(e) => { return Ok(response( diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index 6a05a79bd2..f196bd6e50 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -22,6 +22,7 @@ use std::collections::{HashMap, HashSet}; use std::str::{self, FromStr}; use std::time::{SystemTime, UNIX_EPOCH}; use zenoh::buffers::ZBuf; +use zenoh::encoding::EncodingMapping; use zenoh::prelude::r#async::*; use zenoh::query::ConsolidationMode; use zenoh::time::{Timestamp, NTP64}; @@ -693,7 +694,7 @@ fn construct_update(data: String) -> Update { for slice in result.3 { payload.push_zslice(slice.to_vec().into()); } - let value = Value::new(payload).encoding(Encoding::try_from(result.2).unwrap()); // @TODO: remove the unwrap() + let value = Value::new(payload).encoding(DefaultEncoder::parse(result.2.as_str()).unwrap()); // @TODO: remove the unwrap() let data = StoredData { value, timestamp: Timestamp::from_str(&result.1).unwrap(), // @TODO: remove the unwrap() diff --git a/zenoh/src/admin.rs b/zenoh/src/admin.rs index 9ca0873555..ff300b1a5e 100644 --- a/zenoh/src/admin.rs +++ b/zenoh/src/admin.rs @@ -11,8 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, // -use crate::prelude::Encoding; use crate::{ + encoding::DefaultEncoder, keyexpr, prelude::sync::{KeyExpr, Locality, SampleKind}, queryable::Query, @@ -143,7 +143,7 @@ impl TransportMulticastEventHandler for Handler { let expr = WireExpr::from(&(*KE_PREFIX / own_zid / *KE_TRANSPORT_UNICAST / zid)) .to_owned(); let info = DataInfo { - encoding: Some(Encoding::APP_JSON), + encoding: Some(DefaultEncoder::APP_JSON), ..Default::default() }; self.session.handle_data( @@ -189,7 +189,7 @@ impl TransportPeerEventHandler for PeerHandler { let mut s = DefaultHasher::new(); link.hash(&mut s); let info = DataInfo { - encoding: Some(Encoding::APP_JSON), + encoding: Some(DefaultEncoder::APP_JSON), ..Default::default() }; self.session.handle_data( diff --git a/zenoh/src/encoding.rs b/zenoh/src/encoding.rs index e131392709..8281ce252a 100644 --- a/zenoh/src/encoding.rs +++ b/zenoh/src/encoding.rs @@ -11,133 +11,60 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::{ - fmt, - ops::{Deref, DerefMut}, - str::FromStr, -}; -use zenoh_protocol::core::{Encoding as WireEncoding, EncodingPrefix}; +use crate::{value::Value, Sample}; +use phf::phf_ordered_map; +use std::borrow::Cow; +#[cfg(feature = "shared-memory")] +use std::sync::Arc; +use zenoh_buffers::{buffer::SplitBuffer, ZBuf}; +use zenoh_collections::Properties; +use zenoh_protocol::core::{Encoding, EncodingPrefix}; use zenoh_result::ZResult; +#[cfg(feature = "shared-memory")] +use zenoh_shm::SharedMemoryBuf; pub trait EncodingMapping { - fn prefix_to_str(&self, e: zenoh_protocol::core::EncodingPrefix) -> &str; - fn str_to_prefix(&self, s: &str) -> zenoh_protocol::core::EncodingPrefix; - fn parse(s: &str) -> ZResult; -} - -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct Encoding -where - T: EncodingMapping, -{ - encoding: WireEncoding, - mapping: T, -} - -impl fmt::Display for Encoding -where - T: EncodingMapping, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(self.mapping.prefix_to_str(self.encoding.prefix()))?; - f.write_str(self.encoding.suffix()) - } -} - -impl Encoding -where - T: EncodingMapping, -{ - const fn exact(prefix: EncodingPrefix, mapping: T) -> Self { - Self { - encoding: WireEncoding::new(prefix), - mapping, - } - } - - pub fn mapping(&self) -> &T { - &self.mapping - } - - pub fn mapping_mut(&mut self) -> &mut T { - &mut self.mapping - } - - pub fn with_mapping(self, mapping: U) -> Encoding - where - U: EncodingMapping, - { - let Self { encoding, .. } = self; - Encoding { encoding, mapping } - } - - pub fn resolve<'a>(encoding: &'a Encoding, mapping: &'a T) -> String { - format!( - "{}{}", - mapping.prefix_to_str(encoding.prefix()), - encoding.suffix() - ) - } + fn prefix_to_str(&self, e: EncodingPrefix) -> &str; + fn str_to_prefix(&self, s: &str) -> EncodingPrefix; + fn parse(s: &str) -> ZResult; } -impl Deref for Encoding { - type Target = WireEncoding; - - fn deref(&self) -> &Self::Target { - &self.encoding - } -} - -impl DerefMut for Encoding { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.encoding - } -} - -impl From for Encoding { - fn from(encoding: WireEncoding) -> Self { - Self { - encoding, - mapping: DefaultEncodingMapping, - } - } +pub trait Encoder { + fn encode(t: T) -> Value; } -impl From for WireEncoding { - fn from(encoding: Encoding) -> Self { - encoding.encoding - } +pub trait Decoder { + fn decode(t: &Value) -> ZResult; } -// Default encoding provided with Zenoh +/// Default encoder provided with Zenoh to facilitate the encoding and decoding +/// of Values in the Rust API. Please note that Zenoh does not impose any +/// encoding and users are free to use any encoder they like. #[derive(Clone, Debug, Default, PartialEq, Eq)] -pub struct DefaultEncodingMapping; +pub struct DefaultEncoder; -use phf::phf_ordered_map; -impl DefaultEncodingMapping { - /// Prefixes from 0 to 63 are reserved by Zenoh - /// Users are free to use any prefix from 64 to 255 - pub const EMPTY: EncodingPrefix = 0; - pub const APP_OCTET_STREAM: EncodingPrefix = 1; - pub const APP_CUSTOM: EncodingPrefix = 2; - pub const TEXT_PLAIN: EncodingPrefix = 3; - pub const APP_PROPERTIES: EncodingPrefix = 4; - pub const APP_JSON: EncodingPrefix = 5; - pub const APP_SQL: EncodingPrefix = 6; - pub const APP_INTEGER: EncodingPrefix = 7; - pub const APP_FLOAT: EncodingPrefix = 8; - pub const APP_XML: EncodingPrefix = 9; - pub const APP_XHTML_XML: EncodingPrefix = 10; - pub const APP_XWWW_FORM_URLENCODED: EncodingPrefix = 11; - pub const TEXT_JSON: EncodingPrefix = 12; - pub const TEXT_HTML: EncodingPrefix = 13; - pub const TEXT_XML: EncodingPrefix = 14; - pub const TEXT_CSS: EncodingPrefix = 15; - pub const TEXT_CSV: EncodingPrefix = 16; - pub const TEXT_JAVASCRIPT: EncodingPrefix = 17; - pub const IMAGE_JPEG: EncodingPrefix = 18; - pub const IMAGE_PNG: EncodingPrefix = 19; - pub const IMAGE_GIF: EncodingPrefix = 20; +impl DefaultEncoder { + pub const EMPTY: Encoding = Encoding::new(0); + pub const APP_OCTET_STREAM: Encoding = Encoding::new(1); + pub const APP_CUSTOM: Encoding = Encoding::new(2); + pub const TEXT_PLAIN: Encoding = Encoding::new(3); + pub const APP_PROPERTIES: Encoding = Encoding::new(4); + pub const APP_JSON: Encoding = Encoding::new(5); + pub const APP_SQL: Encoding = Encoding::new(6); + pub const APP_INTEGER: Encoding = Encoding::new(7); + pub const APP_FLOAT: Encoding = Encoding::new(8); + pub const APP_XML: Encoding = Encoding::new(9); + pub const APP_XHTML_XML: Encoding = Encoding::new(10); + pub const APP_XWWW_FORM_URLENCODED: Encoding = Encoding::new(11); + pub const TEXT_JSON: Encoding = Encoding::new(12); + pub const TEXT_HTML: Encoding = Encoding::new(13); + pub const TEXT_XML: Encoding = Encoding::new(14); + pub const TEXT_CSS: Encoding = Encoding::new(15); + pub const TEXT_CSV: Encoding = Encoding::new(16); + pub const TEXT_JAVASCRIPT: Encoding = Encoding::new(17); + pub const IMAGE_JPEG: Encoding = Encoding::new(18); + pub const IMAGE_PNG: Encoding = Encoding::new(19); + pub const IMAGE_GIF: Encoding = Encoding::new(20); // A perfect hashmap for fast lookup of prefixes pub(super) const KNOWN_PREFIX: phf::OrderedMap = phf_ordered_map! { @@ -190,7 +117,7 @@ impl DefaultEncodingMapping { }; } -impl EncodingMapping for DefaultEncodingMapping { +impl EncodingMapping for DefaultEncoder { // Given a numeric prefix ID returns its string representation fn prefix_to_str(&self, p: zenoh_protocol::core::EncodingPrefix) -> &'static str { match Self::KNOWN_PREFIX.get(&p) { @@ -204,94 +131,256 @@ impl EncodingMapping for DefaultEncodingMapping { fn str_to_prefix(&self, s: &str) -> zenoh_protocol::core::EncodingPrefix { match Self::KNOWN_STRING.get(s) { Some(p) => *p, - None => Self::EMPTY, + None => Self::EMPTY.prefix(), } } - // Parse a string into a valid WireEncoding. This functions performs the necessary + // Parse a string into a valid Encoding. This functions performs the necessary // prefix mapping and suffix substring when parsing the input. - fn parse(t: &str) -> ZResult { + fn parse(t: &str) -> ZResult { for (s, p) in Self::KNOWN_STRING.entries() { if let Some((_, b)) = t.split_once(s) { - let a = b.to_string(); - return WireEncoding::new(*p).with_suffix(a); + return Encoding::new(*p).with_suffix(b.to_string()); } } - // WireEncoding::empty().with_suffix(t) - Ok(WireEncoding::empty()) + Encoding::empty().with_suffix(t.to_string()) + } +} + +// -- impls + +// Bytes conversion +impl Encoder for DefaultEncoder { + fn encode(t: ZBuf) -> Value { + Value { + payload: t, + encoding: DefaultEncoder::APP_OCTET_STREAM, + } + } +} + +impl Decoder for DefaultEncoder { + fn decode(v: &Value) -> ZResult { + if v.encoding == DefaultEncoder::APP_OCTET_STREAM { + Ok(v.payload.clone()) + } else { + Err(zerror!("{:?} can not be converted into String", v).into()) + } + } +} + +impl Encoder> for DefaultEncoder { + fn encode(t: Vec) -> Value { + Self::encode(ZBuf::from(t)) } } -impl Encoding { - /// The encoding is empty. It is equivalent to not being defined. - pub const EMPTY: Self = Self::new(DefaultEncodingMapping::EMPTY); - pub const APP_OCTET_STREAM: Self = Self::new(DefaultEncodingMapping::APP_OCTET_STREAM); - pub const APP_CUSTOM: Self = Self::new(DefaultEncodingMapping::APP_CUSTOM); - pub const TEXT_PLAIN: Self = Self::new(DefaultEncodingMapping::TEXT_PLAIN); - pub const APP_PROPERTIES: Self = Self::new(DefaultEncodingMapping::APP_PROPERTIES); - pub const APP_JSON: Self = Self::new(DefaultEncodingMapping::APP_JSON); - pub const APP_SQL: Self = Self::new(DefaultEncodingMapping::APP_SQL); - pub const APP_INTEGER: Self = Self::new(DefaultEncodingMapping::APP_INTEGER); - pub const APP_FLOAT: Self = Self::new(DefaultEncodingMapping::APP_FLOAT); - pub const APP_XML: Self = Self::new(DefaultEncodingMapping::APP_XML); - pub const APP_XHTML_XML: Self = Self::new(DefaultEncodingMapping::APP_XHTML_XML); - pub const APP_XWWW_FORM_URLENCODED: Self = - Self::new(DefaultEncodingMapping::APP_XWWW_FORM_URLENCODED); - pub const TEXT_JSON: Self = Self::new(DefaultEncodingMapping::TEXT_JSON); - pub const TEXT_HTML: Self = Self::new(DefaultEncodingMapping::TEXT_HTML); - pub const TEXT_XML: Self = Self::new(DefaultEncodingMapping::TEXT_XML); - pub const TEXT_CSS: Self = Self::new(DefaultEncodingMapping::TEXT_CSS); - pub const TEXT_CSV: Self = Self::new(DefaultEncodingMapping::TEXT_CSV); - pub const TEXT_JAVASCRIPT: Self = Self::new(DefaultEncodingMapping::TEXT_JAVASCRIPT); - pub const IMAGE_JPEG: Self = Self::new(DefaultEncodingMapping::IMAGE_JPEG); - pub const IMAGE_PNG: Self = Self::new(DefaultEncodingMapping::IMAGE_PNG); - pub const IMAGE_GIF: Self = Self::new(DefaultEncodingMapping::IMAGE_GIF); - - /// Returns a new [`Encoding`] object. - pub const fn new(prefix: EncodingPrefix) -> Self { - Self::exact(prefix, DefaultEncodingMapping) +impl Encoder<&[u8]> for DefaultEncoder { + fn encode(t: &[u8]) -> Value { + Self::encode(t.to_vec()) } } -impl Default for Encoding { - fn default() -> Self { - Self::EMPTY +impl Decoder> for DefaultEncoder { + fn decode(v: &Value) -> ZResult> { + let v: ZBuf = Self::decode(v)?; + Ok(v.contiguous().to_vec()) } } -impl FromStr for Encoding -where - T: EncodingMapping + Default, -{ - type Err = zenoh_result::Error; - - fn from_str(s: &str) -> Result { - let encoding: WireEncoding = T::parse(s)?; - Ok(Self { - encoding, - mapping: T::default(), - }) +impl<'a> Encoder> for DefaultEncoder { + fn encode(t: Cow<'a, [u8]>) -> Value { + Self::encode(ZBuf::from(t.to_vec())) } } -impl TryFrom<&str> for Encoding -where - T: EncodingMapping + Default, -{ - type Error = zenoh_result::Error; +impl<'a> Decoder> for DefaultEncoder { + fn decode(v: &Value) -> ZResult> { + let v: Vec = Self::decode(v)?; + Ok(Cow::Owned(v)) + } +} - fn try_from(s: &str) -> Result { - s.parse() +// String +impl Encoder for DefaultEncoder { + fn encode(s: String) -> Value { + Value { + payload: ZBuf::from(s.into_bytes()), + encoding: DefaultEncoder::TEXT_PLAIN, + } } } -impl TryFrom for Encoding -where - T: EncodingMapping + Default, -{ - type Error = zenoh_result::Error; +impl Encoder<&str> for DefaultEncoder { + fn encode(s: &str) -> Value { + Self::encode(s.to_string()) + } +} - fn try_from(s: String) -> Result { - s.parse() +impl Decoder for DefaultEncoder { + fn decode(v: &Value) -> ZResult { + if v.encoding == DefaultEncoder::TEXT_PLAIN { + String::from_utf8(v.payload.contiguous().to_vec()).map_err(|e| zerror!("{}", e).into()) + } else { + Err(zerror!("{:?} can not be converted into String", v).into()) + } + } +} + +impl<'a> Encoder> for DefaultEncoder { + fn encode(s: Cow<'a, str>) -> Value { + Self::encode(s.to_string()) + } +} + +impl<'a> Decoder> for DefaultEncoder { + fn decode(v: &Value) -> ZResult> { + let v: String = Self::decode(v)?; + Ok(Cow::Owned(v)) + } +} + +// Sample +impl Encoder for DefaultEncoder { + fn encode(t: Sample) -> Value { + t.value + } +} + +// Integers +macro_rules! impl_int { + ($t:ty, $encoding:expr) => { + impl Encoder<$t> for DefaultEncoder { + fn encode(t: $t) -> Value { + Value { + payload: ZBuf::from(t.to_string().into_bytes()), + encoding: $encoding, + } + } + } + + impl Encoder<&$t> for DefaultEncoder { + fn encode(t: &$t) -> Value { + Self::encode(*t) + } + } + + impl Decoder<$t> for DefaultEncoder { + fn decode(v: &Value) -> ZResult<$t> { + if v.encoding == $encoding { + let v: $t = std::str::from_utf8(&v.payload.contiguous()) + .map_err(|e| zerror!("{}", e))? + .parse() + .map_err(|e| zerror!("{}", e))?; + Ok(v) + } else { + Err(zerror!("{:?} can not be converted into String", v).into()) + } + } + } + }; +} + +impl_int!(u8, DefaultEncoder::APP_INTEGER); +impl_int!(u16, DefaultEncoder::APP_INTEGER); +impl_int!(u32, DefaultEncoder::APP_INTEGER); +impl_int!(u64, DefaultEncoder::APP_INTEGER); +impl_int!(usize, DefaultEncoder::APP_INTEGER); + +impl_int!(i8, DefaultEncoder::APP_INTEGER); +impl_int!(i16, DefaultEncoder::APP_INTEGER); +impl_int!(i32, DefaultEncoder::APP_INTEGER); +impl_int!(i64, DefaultEncoder::APP_INTEGER); +impl_int!(isize, DefaultEncoder::APP_INTEGER); + +// Floats +impl_int!(f32, DefaultEncoder::APP_FLOAT); +impl_int!(f64, DefaultEncoder::APP_FLOAT); + +// JSON +impl Encoder<&serde_json::Value> for DefaultEncoder { + fn encode(t: &serde_json::Value) -> Value { + Value { + payload: ZBuf::from(t.to_string().into_bytes()), + encoding: DefaultEncoder::APP_JSON, + } + } +} + +impl Encoder for DefaultEncoder { + fn encode(t: serde_json::Value) -> Value { + Self::encode(&t) + } +} + +impl Decoder for DefaultEncoder { + fn decode(v: &Value) -> ZResult { + if v.encoding == DefaultEncoder::APP_JSON || v.encoding == DefaultEncoder::TEXT_JSON { + let r: serde_json::Value = serde::Deserialize::deserialize( + &mut serde_json::Deserializer::from_slice(&v.payload.contiguous()), + ) + .map_err(|e| zerror!("{}", e))?; + Ok(r) + } else { + Err(zerror!("{:?} can not be converted into JSON", v.encoding).into()) + } + } +} + +// Properties +impl Encoder<&Properties> for DefaultEncoder { + fn encode(t: &Properties) -> Value { + Value { + payload: ZBuf::from(t.to_string().into_bytes()), + encoding: DefaultEncoder::APP_PROPERTIES, + } + } +} + +impl Encoder for DefaultEncoder { + fn encode(t: Properties) -> Value { + Self::encode(&t) + } +} + +impl Decoder for DefaultEncoder { + fn decode(v: &Value) -> ZResult { + if v.encoding == DefaultEncoder::APP_PROPERTIES { + let ps = Properties::from( + std::str::from_utf8(&v.payload.contiguous()).map_err(|e| zerror!("{}", e))?, + ); + Ok(ps) + } else { + Err(zerror!("{:?} can not be converted into Properties", v.encoding).into()) + } + } +} + +// Shared memory conversion +#[cfg(feature = "shared-memory")] +impl Encoder> for DefaultEncoder { + fn encode(t: Arc) -> Value { + Value { + payload: t.into(), + encoding: DefaultEncoder::APP_OCTET_STREAM, + } + } +} + +#[cfg(feature = "shared-memory")] +impl Encoder> for DefaultEncoder { + fn encode(t: Box) -> Value { + let smb: Arc = t.into(); + Self::encode(smb) + } +} + +#[cfg(feature = "shared-memory")] +impl Encoder for DefaultEncoder { + fn encode(t: SharedMemoryBuf) -> Value { + Value { + payload: t.into(), + encoding: DefaultEncoder::APP_OCTET_STREAM, + } } } diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index db17694092..704c5354c4 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -22,6 +22,7 @@ use crate::prelude::{ use crate::queryable::Query; use crate::queryable::QueryInner; use crate::value::Value; +use crate::DefaultEncoder; use async_std::task; use log::{error, trace}; use serde_json::json; @@ -425,7 +426,7 @@ impl Primitives for AdminSpace { parameters, value: query .ext_body - .map(|b| Value::from(b.payload).encoding(b.encoding.into())), + .map(|b| Value::from(b.payload).encoding(b.encoding)), qid: msg.id, zid, primitives, @@ -565,7 +566,7 @@ fn router_data(context: &AdminContext, query: Query) { if let Err(e) = query .reply(Ok(Sample::new( reply_key, - Value::from(json.to_string().as_bytes().to_vec()).encoding(Encoding::APP_JSON), + Value::from(json.to_string().as_bytes().to_vec()).encoding(Encoding::new(42)), ))) .res() { @@ -599,7 +600,7 @@ zenoh_build{{version="{}"}} 1 if let Err(e) = query .reply(Ok(Sample::new( reply_key, - Value::from(metrics.as_bytes().to_vec()).encoding(Encoding::TEXT_PLAIN), + Value::encode(metrics.as_bytes().to_vec()), ))) .res() { @@ -617,14 +618,7 @@ fn routers_linkstate_data(context: &AdminContext, query: Query) { if let Err(e) = query .reply(Ok(Sample::new( reply_key, - Value::from( - tables - .hat_code - .info(&tables, WhatAmI::Router) - .as_bytes() - .to_vec(), - ) - .encoding(Encoding::TEXT_PLAIN), + Value::encode(tables.hat_code.info(&tables, WhatAmI::Router)), ))) .res() { @@ -642,14 +636,7 @@ fn peers_linkstate_data(context: &AdminContext, query: Query) { if let Err(e) = query .reply(Ok(Sample::new( reply_key, - Value::from( - tables - .hat_code - .info(&tables, WhatAmI::Peer) - .as_bytes() - .to_vec(), - ) - .encoding(Encoding::TEXT_PLAIN), + Value::encode(tables.hat_code.info(&tables, WhatAmI::Peer)), ))) .res() { @@ -721,10 +708,7 @@ fn plugins_status(context: &AdminContext, query: Query) { if let Ok(key_expr) = KeyExpr::try_from(plugin_path_key.clone()) { if query.key_expr().intersects(&key_expr) { if let Err(e) = query - .reply(Ok(Sample::new( - key_expr, - Value::from(plugin.path()).encoding(Encoding::TEXT_PLAIN), - ))) + .reply(Ok(Sample::new(key_expr, Value::encode(plugin.path())))) .res() { log::error!("Error sending AdminSpace reply: {:?}", e); @@ -750,7 +734,7 @@ fn plugins_status(context: &AdminContext, query: Query) { if let Ok(key_expr) = KeyExpr::try_from(response.key) { if let Err(e) = query.reply(Ok(Sample::new( key_expr, - Value::from(response.value).encoding(Encoding::TEXT_PLAIN), + Value::from(response.value).encoding(DefaultEncoder::TEXT_PLAIN), ))) .res() { diff --git a/zenoh/src/prelude.rs b/zenoh/src/prelude.rs index 7bc54b7ec4..26857e0d9b 100644 --- a/zenoh/src/prelude.rs +++ b/zenoh/src/prelude.rs @@ -41,9 +41,10 @@ pub(crate) mod common { pub use crate::query::{QueryConsolidation, QueryTarget}; - /// The encoding of a zenoh `Value`. - pub use crate::encoding::Encoding; + pub use crate::encoding::DefaultEncoder; pub use crate::value::Value; + /// The encoding of a zenoh `Value`. + pub use zenoh_protocol::core::{Encoding, EncodingPrefix}; pub use crate::query::ConsolidationMode; #[zenoh_macros::unstable] diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 87e3f9261f..daa720819a 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -824,7 +824,7 @@ fn resolve_put( } PushBody::Put(Put { timestamp, - encoding: value.encoding.clone().into(), + encoding: value.encoding.clone(), ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 297f33e973..9ee73d1641 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -225,7 +225,7 @@ impl SyncResolve for ReplyBuilder<'_> { }, payload: ResponseBody::Reply(zenoh::Reply { timestamp: data_info.timestamp, - encoding: data_info.encoding.unwrap_or_default().into(), + encoding: data_info.encoding.unwrap_or_default(), ext_sinfo: if data_info.source_id.is_some() || data_info.source_sn.is_some() { Some(zenoh::reply::ext::SourceInfoType { @@ -269,7 +269,7 @@ impl SyncResolve for ReplyBuilder<'_> { #[cfg(feature = "shared-memory")] ext_shm: None, payload: payload.payload, - encoding: payload.encoding.into(), + encoding: payload.encoding, }), code: 0, // TODO }), diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 3aba44a5c0..d5740904b6 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -11,10 +11,10 @@ // Contributors: // ZettaScale Zenoh Team, // - use crate::admin; use crate::config::Config; use crate::config::Notifier; +use crate::encoding::DefaultEncoder; use crate::handlers::{Callback, DefaultHandler}; use crate::info::*; use crate::key_expr::KeyExprInner; @@ -33,7 +33,6 @@ use crate::sample::Attachment; use crate::sample::DataInfo; use crate::selector::TIME_RANGE_KEY; use crate::subscriber::*; -use crate::Encoding; use crate::Id; use crate::Priority; use crate::Sample; @@ -1815,7 +1814,7 @@ impl Session { ext_body: value.as_ref().map(|v| query::ext::QueryBodyType { #[cfg(feature = "shared-memory")] ext_shm: None, - encoding: v.encoding.clone().into(), + encoding: v.encoding.clone(), payload: v.payload.clone(), }), ext_attachment, @@ -1834,7 +1833,7 @@ impl Session { value.as_ref().map(|v| query::ext::QueryBodyType { #[cfg(feature = "shared-memory")] ext_shm: None, - encoding: v.encoding.clone().into(), + encoding: v.encoding.clone(), payload: v.payload.clone(), }), #[cfg(feature = "unstable")] @@ -1906,7 +1905,7 @@ impl Session { parameters, value: body.map(|b| Value { payload: b.payload, - encoding: b.encoding.into(), + encoding: b.encoding, }), qid, zid, @@ -2191,7 +2190,7 @@ impl Primitives for Session { PushBody::Put(m) => { let info = DataInfo { kind: SampleKind::Put, - encoding: Some(m.encoding.into()), + encoding: Some(m.encoding), timestamp: m.timestamp, source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), @@ -2267,11 +2266,11 @@ impl Primitives for Session { let value = match e.ext_body { Some(body) => Value { payload: body.payload, - encoding: body.encoding.into(), + encoding: body.encoding, }, None => Value { payload: ZBuf::empty(), - encoding: Encoding::EMPTY, + encoding: DefaultEncoder::EMPTY, }, }; let replier_id = match e.ext_sinfo { @@ -2344,7 +2343,7 @@ impl Primitives for Session { }; let info = DataInfo { kind: SampleKind::Put, - encoding: Some(m.encoding.into()), + encoding: Some(m.encoding), timestamp: m.timestamp, source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), diff --git a/zenoh/src/value.rs b/zenoh/src/value.rs index 9c0afa28c7..1a5620900d 100644 --- a/zenoh/src/value.rs +++ b/zenoh/src/value.rs @@ -14,19 +14,11 @@ //! Value primitives. use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine}; -use std::borrow::Cow; -use std::convert::TryFrom; -#[cfg(feature = "shared-memory")] -use std::sync::Arc; - -use zenoh_collections::Properties; -use zenoh_result::ZError; +use zenoh_result::ZResult; use crate::buffers::ZBuf; -use crate::encoding::DefaultEncodingMapping; -use crate::prelude::{Encoding, Sample, SplitBuffer}; -#[cfg(feature = "shared-memory")] -use zenoh_shm::SharedMemoryBuf; +use crate::encoding::{Decoder, DefaultEncoder, Encoder}; +use crate::prelude::{Encoding, SplitBuffer}; /// A zenoh Value. #[non_exhaustive] @@ -43,7 +35,7 @@ impl Value { pub fn new(payload: ZBuf) -> Self { Value { payload, - encoding: Encoding::APP_OCTET_STREAM, + encoding: Encoding::empty(), } } @@ -51,7 +43,7 @@ impl Value { pub fn empty() -> Self { Value { payload: ZBuf::empty(), - encoding: Encoding::EMPTY, + encoding: Encoding::empty(), } } @@ -87,619 +79,41 @@ impl std::fmt::Display for Value { impl std::error::Error for Value {} -// Shared memory conversion -#[cfg(feature = "shared-memory")] -impl From> for Value { - fn from(smb: Arc) -> Self { - Value { - payload: smb.into(), - encoding: Encoding::APP_OCTET_STREAM, - } - } -} - -#[cfg(feature = "shared-memory")] -impl From> for Value { - fn from(smb: Box) -> Self { - let smb: Arc = smb.into(); - Self::from(smb) - } -} - -#[cfg(feature = "shared-memory")] -impl From for Value { - fn from(smb: SharedMemoryBuf) -> Self { - Value { - payload: smb.into(), - encoding: Encoding::APP_OCTET_STREAM, - } - } -} - -// Bytes conversion -impl From for Value { - fn from(buf: ZBuf) -> Self { - Value { - payload: buf, - encoding: Encoding::APP_OCTET_STREAM, - } - } -} - -impl TryFrom<&Value> for ZBuf { - type Error = ZError; - - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_OCTET_STREAM => Ok(v.payload.clone()), - unexpected => Err(zerror!( - "{:?} can not be converted into Cow<'a, [u8]>", - unexpected - )), - } - } -} - -impl TryFrom for ZBuf { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) - } -} - -impl From<&[u8]> for Value { - fn from(buf: &[u8]) -> Self { - Value::from(ZBuf::from(buf.to_vec())) - } -} - -impl<'a> TryFrom<&'a Value> for Cow<'a, [u8]> { - type Error = ZError; - - fn try_from(v: &'a Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_OCTET_STREAM => Ok(v.payload.contiguous()), - unexpected => Err(zerror!( - "{:?} can not be converted into Cow<'a, [u8]>", - unexpected - )), - } - } -} - -impl From> for Value { - fn from(buf: Vec) -> Self { - Value::from(ZBuf::from(buf)) - } -} - -impl TryFrom<&Value> for Vec { - type Error = ZError; - - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_OCTET_STREAM => Ok(v.payload.contiguous().to_vec()), - unexpected => Err(zerror!( - "{:?} can not be converted into Vec", - unexpected - )), - } - } -} - -impl TryFrom for Vec { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) - } -} - -// String conversion -impl From for Value { - fn from(s: String) -> Self { - Value { - payload: ZBuf::from(s.into_bytes()), - encoding: Encoding::TEXT_PLAIN, - } - } -} - -impl From<&str> for Value { - fn from(s: &str) -> Self { - Value { - payload: ZBuf::from(Vec::::from(s)), - encoding: Encoding::TEXT_PLAIN, - } - } -} - -impl TryFrom<&Value> for String { - type Error = ZError; - - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::TEXT_PLAIN => { - String::from_utf8(v.payload.contiguous().to_vec()).map_err(|e| zerror!("{}", e)) - } - unexpected => Err(zerror!("{:?} can not be converted into String", unexpected)), - } - } -} - -impl TryFrom for String { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) - } -} - -// Sample conversion -impl From for Value { - fn from(s: Sample) -> Self { - s.value - } -} - -// i64 conversion -impl From for Value { - fn from(i: i64) -> Self { - Value { - payload: ZBuf::from(Vec::::from(i.to_string())), - encoding: Encoding::APP_INTEGER, - } - } -} - -impl TryFrom<&Value> for i64 { - type Error = ZError; - - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_INTEGER => std::str::from_utf8(&v.payload.contiguous()) - .map_err(|e| zerror!("{}", e))? - .parse() - .map_err(|e| zerror!("{}", e)), - unexpected => Err(zerror!("{:?} can not be converted into i64", unexpected)), - } - } -} - -impl TryFrom for i64 { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) - } -} - -// i32 conversion -impl From for Value { - fn from(i: i32) -> Self { - Value { - payload: ZBuf::from(Vec::::from(i.to_string())), - encoding: Encoding::APP_INTEGER, - } - } -} - -impl TryFrom<&Value> for i32 { - type Error = ZError; - - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_INTEGER => std::str::from_utf8(&v.payload.contiguous()) - .map_err(|e| zerror!("{}", e))? - .parse() - .map_err(|e| zerror!("{}", e)), - unexpected => Err(zerror!("{:?} can not be converted into i32", unexpected)), - } - } -} - -impl TryFrom for i32 { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) - } -} - -// i16 conversion -impl From for Value { - fn from(i: i16) -> Self { - Value { - payload: ZBuf::from(Vec::::from(i.to_string())), - encoding: Encoding::APP_INTEGER, - } - } -} - -impl TryFrom<&Value> for i16 { - type Error = ZError; - - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_INTEGER => std::str::from_utf8(&v.payload.contiguous()) - .map_err(|e| zerror!("{}", e))? - .parse() - .map_err(|e| zerror!("{}", e)), - unexpected => Err(zerror!("{:?} can not be converted into i16", unexpected)), - } - } -} - -impl TryFrom for i16 { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) - } -} - -// i8 conversion -impl From for Value { - fn from(i: i8) -> Self { - Value { - payload: ZBuf::from(Vec::::from(i.to_string())), - encoding: Encoding::APP_INTEGER, - } - } -} - -impl TryFrom<&Value> for i8 { - type Error = ZError; - - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_INTEGER => std::str::from_utf8(&v.payload.contiguous()) - .map_err(|e| zerror!("{}", e))? - .parse() - .map_err(|e| zerror!("{}", e)), - unexpected => Err(zerror!("{:?} can not be converted into i8", unexpected)), - } - } -} - -impl TryFrom for i8 { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) - } -} - -// isize conversion -impl From for Value { - fn from(i: isize) -> Self { - Value { - payload: ZBuf::from(Vec::::from(i.to_string())), - encoding: Encoding::APP_INTEGER, - } - } -} - -impl TryFrom<&Value> for isize { - type Error = ZError; - - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_INTEGER => std::str::from_utf8(&v.payload.contiguous()) - .map_err(|e| zerror!("{}", e))? - .parse() - .map_err(|e| zerror!("{}", e)), - unexpected => Err(zerror!("{:?} can not be converted into isize", unexpected)), - } - } -} - -impl TryFrom for isize { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) - } -} - -// u64 conversion -impl From for Value { - fn from(i: u64) -> Self { - Value { - payload: ZBuf::from(Vec::::from(i.to_string())), - encoding: Encoding::APP_INTEGER, - } - } -} - -impl TryFrom<&Value> for u64 { - type Error = ZError; - - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_INTEGER => std::str::from_utf8(&v.payload.contiguous()) - .map_err(|e| zerror!("{}", e))? - .parse() - .map_err(|e| zerror!("{}", e)), - unexpected => Err(zerror!("{:?} can not be converted into u64", unexpected)), - } - } -} - -impl TryFrom for u64 { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) - } -} - -// u32 conversion -impl From for Value { - fn from(i: u32) -> Self { - Value { - payload: ZBuf::from(Vec::::from(i.to_string())), - encoding: Encoding::APP_INTEGER, - } - } -} - -impl TryFrom<&Value> for u32 { - type Error = ZError; - - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_INTEGER => std::str::from_utf8(&v.payload.contiguous()) - .map_err(|e| zerror!("{}", e))? - .parse() - .map_err(|e| zerror!("{}", e)), - unexpected => Err(zerror!("{:?} can not be converted into u32", unexpected)), - } - } -} - -impl TryFrom for u32 { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) - } -} - -// u16 conversion -impl From for Value { - fn from(i: u16) -> Self { - Value { - payload: ZBuf::from(Vec::::from(i.to_string())), - encoding: Encoding::APP_INTEGER, - } - } -} - -impl TryFrom<&Value> for u16 { - type Error = ZError; - - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_INTEGER => std::str::from_utf8(&v.payload.contiguous()) - .map_err(|e| zerror!("{}", e))? - .parse() - .map_err(|e| zerror!("{}", e)), - unexpected => Err(zerror!("{:?} can not be converted into u16", unexpected)), - } - } -} - -impl TryFrom for u16 { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) - } -} - -// u8 conversion -impl From for Value { - fn from(i: u8) -> Self { - Value { - payload: ZBuf::from(Vec::::from(i.to_string())), - encoding: Encoding::APP_INTEGER, - } - } -} - -impl TryFrom<&Value> for u8 { - type Error = ZError; - - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_INTEGER => std::str::from_utf8(&v.payload.contiguous()) - .map_err(|e| zerror!("{}", e))? - .parse() - .map_err(|e| zerror!("{}", e)), - unexpected => Err(zerror!("{:?} can not be converted into u8", unexpected)), - } - } -} - -impl TryFrom for u8 { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) - } -} - -// usize conversion -impl From for Value { - fn from(i: usize) -> Self { - Value { - payload: ZBuf::from(Vec::::from(i.to_string())), - encoding: Encoding::APP_INTEGER, - } - } -} - -impl TryFrom<&Value> for usize { - type Error = ZError; - - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_INTEGER => std::str::from_utf8(&v.payload.contiguous()) - .map_err(|e| zerror!("{}", e))? - .parse() - .map_err(|e| zerror!("{}", e)), - unexpected => Err(zerror!("{:?} can not be converted into usize", unexpected)), - } - } -} - -impl TryFrom for usize { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) - } -} - -// f64 conversion -impl From for Value { - fn from(f: f64) -> Self { - Value { - payload: ZBuf::from(Vec::::from(f.to_string())), - encoding: Encoding::APP_FLOAT, - } - } -} - -impl TryFrom<&Value> for f64 { - type Error = ZError; - - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_FLOAT => std::str::from_utf8(&v.payload.contiguous()) - .map_err(|e| zerror!("{}", e))? - .parse() - .map_err(|e| zerror!("{}", e)), - unexpected => Err(zerror!("{:?} can not be converted into f64", unexpected)), - } - } -} - -impl TryFrom for f64 { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) - } -} - -// f32 conversion -impl From for Value { - fn from(f: f32) -> Self { - Value { - payload: ZBuf::from(Vec::::from(f.to_string())), - encoding: Encoding::APP_FLOAT, - } - } -} - -impl TryFrom<&Value> for f32 { - type Error = ZError; - - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_FLOAT => std::str::from_utf8(&v.payload.contiguous()) - .map_err(|e| zerror!("{}", e))? - .parse() - .map_err(|e| zerror!("{}", e)), - unexpected => Err(zerror!("{:?} can not be converted into f32", unexpected)), - } - } -} - -impl TryFrom for f32 { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) - } -} - -// JSON conversion -impl From<&serde_json::Value> for Value { - fn from(json: &serde_json::Value) -> Self { - Value { - payload: ZBuf::from(Vec::::from(json.to_string())), - encoding: Encoding::APP_JSON, - } - } -} - -impl From for Value { - fn from(json: serde_json::Value) -> Self { - Value::from(&json) - } -} - -impl TryFrom<&Value> for serde_json::Value { - type Error = ZError; - - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_JSON | DefaultEncodingMapping::TEXT_JSON => { - let r = serde::Deserialize::deserialize(&mut serde_json::Deserializer::from_slice( - &v.payload.contiguous(), - )); - r.map_err(|e| zerror!("{}", e)) - } - unexpected => Err(zerror!( - "{:?} can not be converted into Properties", - unexpected - )), - } +impl Value { + pub fn decode(&self) -> ZResult + where + DefaultEncoder: Decoder, + { + DefaultEncoder::decode(self) } -} -impl TryFrom for serde_json::Value { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) + pub fn decode_with(&self) -> ZResult + where + M: Decoder, + { + M::decode(self) } -} -// Properties conversion -impl From for Value { - fn from(p: Properties) -> Self { - Value { - payload: ZBuf::from(Vec::::from(p.to_string())), - encoding: Encoding::APP_PROPERTIES, - } + pub fn encode(t: T) -> Self + where + DefaultEncoder: Encoder, + { + DefaultEncoder::encode(t) } -} - -impl TryFrom<&Value> for Properties { - type Error = ZError; - fn try_from(v: &Value) -> Result { - match v.encoding.prefix() { - DefaultEncodingMapping::APP_PROPERTIES => Ok(Properties::from( - std::str::from_utf8(&v.payload.contiguous()).map_err(|e| zerror!("{}", e))?, - )), - unexpected => Err(zerror!( - "{:?} can not be converted into Properties", - unexpected - )), - } + pub fn encode_with(t: T) -> Self + where + M: Encoder, + { + M::encode(t) } } -impl TryFrom for Properties { - type Error = ZError; - - fn try_from(v: Value) -> Result { - Self::try_from(&v) +impl From for Value +where + DefaultEncoder: Encoder, +{ + fn from(t: T) -> Self { + Value::encode(t) } }