Skip to content

Commit

Permalink
set value api
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Mar 28, 2024
1 parent ab96aab commit 82c1c99
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 43 deletions.
21 changes: 12 additions & 9 deletions examples/examples/z_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ async fn main() {
let session = zenoh::open(config).res().await.unwrap();

println!("Sending Query '{selector}'...");
let replies = match value {
Some(value) => session.get(&selector).with_value(value),
None => session.get(&selector),
}
.target(target)
.timeout(timeout)
.res()
.await
.unwrap();
// let replies = match value {
// Some(value) => session.get(&selector).payload(value),
// None => session.get(&selector),
// }
let replies = session
.get(&selector)
.value(value.map(Value::from))
.target(target)
.timeout(timeout)
.res()
.await
.unwrap();
while let Ok(reply) = replies.recv_async().await {
match reply.sample {
Ok(sample) => {
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ async fn query(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
.content_type()
.map(|m| Encoding::from(m.to_string()))
.unwrap_or_default();
query = query.with_value(Value::from(body).encoding(encoding));
query = query.payload(body).encoding(encoding);
}
match query.res().await {
Ok(receiver) => {
Expand Down
17 changes: 17 additions & 0 deletions zenoh/src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ impl ValueBuilderTrait for PutBuilder<'_, '_> {
..self
}
}
fn value<T: Into<Value>>(self, value: T) -> Self {
let Value { payload, encoding } = value.into();
Self {
payload,
encoding,
..self
}
}
}

impl PutBuilder<'_, '_> {
Expand Down Expand Up @@ -798,6 +806,15 @@ impl ValueBuilderTrait for PutPublication<'_> {
..self
}
}

fn value<T: Into<Value>>(self, value: T) -> Self {
let Value { payload, encoding } = value.into();
Self {
payload,
encoding,
..self
}
}
}

impl TimestampBuilderTrait for DeletePublication<'_> {
Expand Down
49 changes: 21 additions & 28 deletions zenoh/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ impl<Handler> ValueBuilderTrait for GetBuilder<'_, '_, Handler> {
let value = Some(self.value.unwrap_or_default().payload(payload));
Self { value, ..self }
}
fn value<T: Into<Value>>(self, value: T) -> Self {
let value: Value = value.into();
Self {
value: if value.is_empty() { None } else { Some(value) },
..self
}
}
}

impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> {
Expand Down Expand Up @@ -328,48 +335,34 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> {
impl<'a, 'b, Handler> GetBuilder<'a, 'b, Handler> {
/// Change the target of the query.
#[inline]
pub fn target(mut self, target: QueryTarget) -> Self {
self.target = target;
self
pub fn target(self, target: QueryTarget) -> Self {
Self { target, ..self }
}

/// Change the consolidation mode of the query.
#[inline]
pub fn consolidation<QC: Into<QueryConsolidation>>(mut self, consolidation: QC) -> Self {
self.consolidation = consolidation.into();
self
pub fn consolidation<QC: Into<QueryConsolidation>>(self, consolidation: QC) -> Self {
Self {
consolidation: consolidation.into(),
..self
}
}

/// Restrict the matching queryables that will receive the query
/// to the ones that have the given [`Locality`](crate::prelude::Locality).
#[zenoh_macros::unstable]
#[inline]
pub fn allowed_destination(mut self, destination: Locality) -> Self {
self.destination = destination;
self
pub fn allowed_destination(self, destination: Locality) -> Self {
Self {
destination,
..self
}
}

/// Set query timeout.
#[inline]
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}

/// Set query value.
#[inline]
pub fn with_value<IntoValue>(mut self, value: IntoValue) -> Self
where
IntoValue: Into<Value>,
{
self.value = Some(value.into());
self
}

#[zenoh_macros::unstable]
pub fn with_attachment(mut self, attachment: Attachment) -> Self {
self.attachment = Some(attachment);
self
pub fn timeout(self, timeout: Duration) -> Self {
Self { timeout, ..self }
}

/// By default, `get` guarantees that it will only receive replies whose key expressions intersect
Expand Down
7 changes: 7 additions & 0 deletions zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,13 @@ impl ValueBuilderTrait for ReplyBuilder<'_> {
..self
}
}
fn value<T: Into<Value>>(self, value: T) -> Self {
let Value { payload, encoding } = value.into();
Self {
sample_builder: self.sample_builder.payload(payload).encoding(encoding),
..self
}
}
}

/// A builder returned by [`Query::reply_del()`](Query::reply)
Expand Down
12 changes: 12 additions & 0 deletions zenoh/src/sample/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::Payload;
use crate::Priority;
use crate::Sample;
use crate::SampleKind;
use crate::Value;
use uhlc::Timestamp;
use zenoh_core::zresult;
use zenoh_protocol::core::CongestionControl;
Expand Down Expand Up @@ -56,6 +57,9 @@ pub trait ValueBuilderTrait {
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self;
/// Sets the payload
fn payload<T: Into<Payload>>(self, payload: T) -> Self;
/// Sets both payload and encoding at once.
/// This is convenient for passing user type which supports `Into<Value>` when both payload and encoding depends on user type
fn value<T: Into<Value>>(self, value: T) -> Self;
}

#[derive(Debug)]
Expand Down Expand Up @@ -221,6 +225,14 @@ impl ValueBuilderTrait for PutSampleBuilder {
..self.0 .0
}))
}
fn value<T: Into<Value>>(self, value: T) -> Self {
let Value { payload, encoding } = value.into();
Self(SampleBuilder(Sample {
payload,
encoding,
..self.0 .0
}))
}
}

#[derive(Debug)]
Expand Down
18 changes: 18 additions & 0 deletions zenoh/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ impl Value {
encoding: Encoding::default(),
}
}
/// Checks if the [`Value`] is empty.
/// Value is considered empty if its payload is empty and encoding is default.
pub fn is_empty(&self) -> bool {
self.payload.is_empty() && self.encoding == Encoding::default()
}
}

impl ValueBuilderTrait for Value {
Expand All @@ -58,6 +63,10 @@ impl ValueBuilderTrait for Value {
..self
}
}
fn value<T: Into<Value>>(self, value: T) -> Self {
let Value { payload, encoding } = value.into();
Self { payload, encoding }
}
}

impl<T> From<T> for Value
Expand All @@ -72,6 +81,15 @@ where
}
}

impl<T> From<Option<T>> for Value
where
T: Into<Value>,
{
fn from(t: Option<T>) -> Self {
t.map_or_else(Value::empty, Into::into)
}
}

impl Default for Value {
fn default() -> Self {
Value::empty()
Expand Down
6 changes: 3 additions & 3 deletions zenoh/tests/attachments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ fn queries() {
}
let get = zenoh
.get("test/attachment")
.with_value("query")
.with_attachment(
.payload("query")
.attachment(Some(
backer
.iter()
.map(|b| (b.0.as_slice(), b.1.as_slice()))
.collect(),
)
))
.res()
.unwrap();
while let Ok(reply) = get.recv() {
Expand Down
4 changes: 2 additions & 2 deletions zenoh/tests/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ fn query_with_ringbuffer() {

let _reply1 = zenoh
.get("test/ringbuffer_query")
.with_value("query1")
.payload("query1")
.res()
.unwrap();
let _reply2 = zenoh
.get("test/ringbuffer_query")
.with_value("query2")
.payload("query2")
.res()
.unwrap();

Expand Down

0 comments on commit 82c1c99

Please sign in to comment.