Skip to content

Commit

Permalink
Accessors for Value (#927)
Browse files Browse the repository at this point in the history
* accessors for Value

* doctest fix

* valgrind test fix
  • Loading branch information
DenisBiryukov91 authored Apr 11, 2024
1 parent d86653e commit d6da7a8
Show file tree
Hide file tree
Showing 17 changed files with 72 additions and 82 deletions.
4 changes: 2 additions & 2 deletions ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn main() {
query
.reply(
query.selector().key_expr,
query.value().unwrap().payload.clone(),
query.value().unwrap().payload().clone(),
)
.res()
.await
Expand Down Expand Up @@ -71,7 +71,7 @@ async fn main() {
),
Err(err) => println!(
">> Received (ERROR: '{}')",
err.payload
err.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e))
),
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn main() {
}
Err(err) => {
let payload = err
.payload
.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(">> Received (ERROR: '{}')", payload);
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_get_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn main() {
Ok(sample) => println!(">> Alive token ('{}')", sample.key_expr().as_str(),),
Err(err) => {
let payload = err
.payload
.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(">> Received (ERROR: '{}')", payload);
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn main() {
None => println!(">> [Queryable ] Received Query '{}'", query.selector()),
Some(value) => {
let payload = value
.payload
.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
Expand Down
13 changes: 8 additions & 5 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ fn result_to_json(sample: Result<Sample, Value>) -> JSONSample {
Ok(sample) => sample_to_json(&sample),
Err(err) => JSONSample {
key: "ERROR".into(),
value: payload_to_json(&err.payload, &err.encoding),
encoding: err.encoding.to_string(),
value: payload_to_json(err.payload(), err.encoding()),
encoding: err.encoding().to_string(),
time: None,
},
}
Expand Down Expand Up @@ -139,7 +139,7 @@ fn result_to_html(sample: Result<Sample, Value>) -> String {
Err(err) => {
format!(
"<dt>ERROR</dt>\n<dd>{}</dd>\n",
err.payload.deserialize::<Cow<str>>().unwrap_or_default()
err.payload().deserialize::<Cow<str>>().unwrap_or_default()
)
}
}
Expand Down Expand Up @@ -172,8 +172,11 @@ async fn to_raw_response(results: flume::Receiver<Reply>) -> Response {
),
Err(value) => response(
StatusCode::Ok,
Cow::from(&value.encoding).as_ref(),
&value.payload.deserialize::<Cow<str>>().unwrap_or_default(),
Cow::from(value.encoding()).as_ref(),
&value
.payload()
.deserialize::<Cow<str>>()
.unwrap_or_default(),
),
},
Err(_) => response(StatusCode::Ok, "", ""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ impl AlignQueryable {
}
AlignData::Data(k, (v, ts)) => {
query
.reply(k, v.payload)
.encoding(v.encoding)
.reply(k, v.payload().clone())
.encoding(v.encoding().clone())
.timestamp(ts)
.res()
.await
Expand Down
7 changes: 2 additions & 5 deletions plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,8 @@ impl Aligner {
log::trace!("[ALIGNER] Received queried samples: {missing_data:?}");

for (key, (ts, value)) in missing_data {
let Value {
payload, encoding, ..
} = value;
let sample = SampleBuilder::put(key, payload)
.encoding(encoding)
let sample = SampleBuilder::put(key, value.payload().clone())
.encoding(value.encoding().clone())
.timestamp(ts)
.into();
log::debug!("[ALIGNER] Adding {:?} to storage", sample);
Expand Down
34 changes: 15 additions & 19 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,12 @@ impl StorageService {
{
match update.kind {
SampleKind::Put => {
SampleBuilder::put(KeyExpr::from(k.clone()), update.data.value.payload)
.encoding(update.data.value.encoding)
SampleBuilder::put(k.clone(), update.data.value.payload().clone())
.encoding(update.data.value.encoding().clone())
.timestamp(update.data.timestamp)
.into()
}
SampleKind::Delete => SampleBuilder::delete(KeyExpr::from(k.clone()))
SampleKind::Delete => SampleBuilder::delete(k.clone())
.timestamp(update.data.timestamp)
.into(),
}
Expand All @@ -329,8 +329,10 @@ impl StorageService {
storage
.put(
stripped_key,
Value::new(sample_to_store.payload().clone())
.encoding(sample_to_store.encoding().clone()),
Value::new(
sample_to_store.payload().clone(),
sample_to_store.encoding().clone(),
),
*sample_to_store.timestamp().unwrap(),
)
.await
Expand Down Expand Up @@ -514,8 +516,8 @@ impl StorageService {
Ok(stored_data) => {
for entry in stored_data {
if let Err(e) = q
.reply(key.clone(), entry.value.payload)
.encoding(entry.value.encoding)
.reply(key.clone(), entry.value.payload().clone())
.encoding(entry.value.encoding().clone())
.timestamp(entry.timestamp)
.res()
.await
Expand Down Expand Up @@ -546,8 +548,8 @@ impl StorageService {
Ok(stored_data) => {
for entry in stored_data {
if let Err(e) = q
.reply(q.key_expr().clone(), entry.value.payload)
.encoding(entry.value.encoding)
.reply(q.key_expr().clone(), entry.value.payload().clone())
.encoding(entry.value.encoding().clone())
.timestamp(entry.timestamp)
.res()
.await
Expand Down Expand Up @@ -665,20 +667,14 @@ impl StorageService {
fn serialize_update(update: &Update) -> String {
let Update {
kind,
data:
StoredData {
value: Value {
payload, encoding, ..
},
timestamp,
},
data: StoredData { value, timestamp },
} = update;
let zbuf: ZBuf = payload.into();
let zbuf: ZBuf = value.payload().into();

let result = (
kind.to_string(),
timestamp.to_string(),
encoding.to_string(),
value.encoding().to_string(),
zbuf.slices().collect::<Vec<&[u8]>>(),
);
serde_json::to_string_pretty(&result).unwrap()
Expand All @@ -690,7 +686,7 @@ fn construct_update(data: String) -> Update {
for slice in result.3 {
payload.push_zslice(slice.to_vec().into());
}
let value = Value::new(payload).encoding(result.2);
let value = Value::new(payload, result.2);
let data = StoredData {
value,
timestamp: Timestamp::from_str(&result.1).unwrap(), // @TODO: remove the unwrap()
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ where
/// while let Ok(token) = tokens.recv_async().await {
/// match token.sample {
/// Ok(sample) => println!("Alive token ('{}')", sample.key_expr().as_str()),
/// Err(err) => println!("Received (ERROR: '{:?}')", err.payload),
/// Err(err) => println!("Received (ERROR: '{:?}')", err.payload()),
/// }
/// }
/// # }
Expand Down
4 changes: 1 addition & 3 deletions zenoh/src/net/runtime/adminspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,7 @@ impl Primitives for AdminSpace {
inner: Arc::new(QueryInner {
key_expr: key_expr.clone(),
parameters,
value: query
.ext_body
.map(|b| Value::from(b.payload).encoding(b.encoding)),
value: query.ext_body.map(|b| Value::new(b.payload, b.encoding)),
qid: msg.id,
zid,
primitives,
Expand Down
16 changes: 12 additions & 4 deletions zenoh/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,21 @@ impl QoSBuilderTrait for GetBuilder<'_, '_, DefaultHandler> {

impl<Handler> ValueBuilderTrait for GetBuilder<'_, '_, Handler> {
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
let value = Some(self.value.unwrap_or_default().encoding(encoding));
Self { value, ..self }
let mut value = self.value.unwrap_or_default();
value.encoding = encoding.into();
Self {
value: Some(value),
..self
}
}

fn payload<T: Into<Payload>>(self, payload: T) -> Self {
let value = Some(self.value.unwrap_or_default().payload(payload));
Self { value, ..self }
let mut value = self.value.unwrap_or_default();
value.payload = payload.into();
Self {
value: Some(value),
..self
}
}
fn value<T: Into<Value>>(self, value: T) -> Self {
let value: Value = value.into();
Expand Down
14 changes: 6 additions & 8 deletions zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,17 +489,15 @@ pub struct ReplyErrBuilder<'a> {

impl ValueBuilderTrait for ReplyErrBuilder<'_> {
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
Self {
value: self.value.encoding(encoding),
..self
}
let mut value = self.value.clone();
value.encoding = encoding.into();
Self { value, ..self }
}

fn payload<T: Into<Payload>>(self, payload: T) -> Self {
Self {
value: self.value.payload(payload),
..self
}
let mut value = self.value.clone();
value.payload = payload.into();
Self { value, ..self }
}

fn value<T: Into<Value>>(self, value: T) -> Self {
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/sample/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use crate::encoding::Encoding;
use crate::payload::Payload;
use crate::prelude::{KeyExpr, Value};
use crate::sample::builder::{QoSBuilderTrait, ValueBuilderTrait};
use crate::sample::builder::QoSBuilderTrait;
use crate::time::Timestamp;
use crate::Priority;
#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -378,7 +378,7 @@ impl Sample {

impl From<Sample> for Value {
fn from(sample: Sample) -> Self {
Value::new(sample.payload).encoding(sample.encoding)
Value::new(sample.payload, sample.encoding)
}
}

Expand Down
38 changes: 14 additions & 24 deletions zenoh/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,26 @@
//

//! Value primitives.
use crate::{encoding::Encoding, payload::Payload, sample::builder::ValueBuilderTrait};
use crate::{encoding::Encoding, payload::Payload};

/// A zenoh [`Value`] contains a `payload` and an [`Encoding`] that indicates how the [`Payload`] should be interpreted.
#[non_exhaustive]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Value {
/// The binary [`Payload`] of this [`Value`].
pub payload: Payload,
/// The [`Encoding`] of this [`Value`].
pub encoding: Encoding,
pub(crate) payload: Payload,
pub(crate) encoding: Encoding,
}

impl Value {
/// Creates a new [`Value`] with default [`Encoding`].
pub fn new<T>(payload: T) -> Self
/// Creates a new [`Value`] with specified [`Payload`] and [`Encoding`].
pub fn new<T, E>(payload: T, encoding: E) -> Self
where
T: Into<Payload>,
E: Into<Encoding>,
{
Value {
payload: payload.into(),
encoding: Encoding::default(),
encoding: encoding.into(),
}
}
/// Creates an empty [`Value`].
Expand All @@ -48,24 +47,15 @@ impl Value {
pub fn is_empty(&self) -> bool {
self.payload.is_empty() && self.encoding == Encoding::default()
}
}

impl ValueBuilderTrait for Value {
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
Self {
encoding: encoding.into(),
..self
}
/// Gets binary [`Payload`] of this [`Value`].
pub fn payload(&self) -> &Payload {
&self.payload
}
fn payload<T: Into<Payload>>(self, payload: T) -> Self {
Self {
payload: payload.into(),
..self
}
}
fn value<T: Into<Value>>(self, value: T) -> Self {
let Value { payload, encoding } = value.into();
Self { payload, encoding }

/// Gets [`Encoding`] of this [`Value`].
pub fn encoding(&self) -> &Encoding {
&self.encoding
}
}

Expand Down
4 changes: 2 additions & 2 deletions zenoh/tests/attachments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn attachment_queries() {
.callback(|query| {
let s = query
.value()
.map(|q| q.payload.deserialize::<String>().unwrap())
.map(|q| q.payload().deserialize::<String>().unwrap())
.unwrap_or_default();
println!("Query value: {}", s);

Expand All @@ -82,7 +82,7 @@ fn attachment_queries() {
query
.reply(
query.key_expr().clone(),
query.value().unwrap().payload.clone(),
query.value().unwrap().payload().clone(),
)
.attachment(Attachment::from_iter(
attachment
Expand Down
2 changes: 1 addition & 1 deletion zenoh/tests/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ fn query_with_ringbuffer() {
query
.value()
.unwrap()
.payload
.payload()
.deserialize::<String>()
.unwrap(),
"query2"
Expand Down
2 changes: 1 addition & 1 deletion zenoh/tests/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re
let rs = ztimeout!(peer02.get(selector).res_async()).unwrap();
while let Ok(s) = ztimeout!(rs.recv_async()) {
let e = s.sample.unwrap_err();
assert_eq!(e.payload.len(), size);
assert_eq!(e.payload().len(), size);
cnt += 1;
}
}
Expand Down

0 comments on commit d6da7a8

Please sign in to comment.