Skip to content

Commit

Permalink
Query.reply and reply_del, now accept TryIntoKeyExpr instead of IntoK…
Browse files Browse the repository at this point in the history
…eyExpr (#878)
  • Loading branch information
DenisBiryukov91 authored Mar 29, 2024
1 parent 21fb083 commit 312c03a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 24 deletions.
41 changes: 22 additions & 19 deletions zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Query {
#[inline(always)]
#[cfg(feature = "unstable")]
#[doc(hidden)]
pub fn reply_sample(&self, sample: Sample) -> ReplyBuilder<'_> {
pub fn reply_sample(&self, sample: Sample) -> ReplyBuilder<'_, 'static> {
let Sample {
key_expr,
payload,
Expand All @@ -126,7 +126,7 @@ impl Query {
} = sample;
ReplyBuilder {
query: self,
key_expr,
key_expr: Ok(key_expr),
payload,
kind,
encoding,
Expand All @@ -145,18 +145,19 @@ impl Query {
/// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]),
/// replying on a disjoint key expression will result in an error when resolving the reply.
#[inline(always)]
pub fn reply<IntoKeyExpr, IntoPayload>(
pub fn reply<'b, TryIntoKeyExpr, IntoPayload>(
&self,
key_expr: IntoKeyExpr,
key_expr: TryIntoKeyExpr,
payload: IntoPayload,
) -> ReplyBuilder<'_>
) -> ReplyBuilder<'_, 'b>
where
IntoKeyExpr: Into<KeyExpr<'static>>,
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
IntoPayload: Into<Payload>,
{
ReplyBuilder {
query: self,
key_expr: key_expr.into(),
key_expr: key_expr.try_into().map_err(Into::into),
payload: payload.into(),
kind: SampleKind::Put,
timestamp: None,
Expand Down Expand Up @@ -187,13 +188,14 @@ impl Query {
/// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]),
/// replying on a disjoint key expression will result in an error when resolving the reply.
#[inline(always)]
pub fn reply_del<IntoKeyExpr>(&self, key_expr: IntoKeyExpr) -> ReplyBuilder<'_>
pub fn reply_del<'b, TryIntoKeyExpr>(&self, key_expr: TryIntoKeyExpr) -> ReplyBuilder<'_, 'b>
where
IntoKeyExpr: Into<KeyExpr<'static>>,
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
ReplyBuilder {
query: self,
key_expr: key_expr.into(),
key_expr: key_expr.try_into().map_err(Into::into),
payload: Payload::empty(),
kind: SampleKind::Delete,
timestamp: None,
Expand Down Expand Up @@ -248,9 +250,9 @@ impl fmt::Display for Query {
/// A builder returned by [`Query::reply()`](Query::reply) or [`Query::reply()`](Query::reply).
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[derive(Debug)]
pub struct ReplyBuilder<'a> {
pub struct ReplyBuilder<'a, 'b> {
query: &'a Query,
key_expr: KeyExpr<'static>,
key_expr: ZResult<KeyExpr<'b>>,
payload: Payload,
kind: SampleKind,
encoding: Encoding,
Expand All @@ -270,7 +272,7 @@ pub struct ReplyErrBuilder<'a> {
value: Value,
}

impl<'a> ReplyBuilder<'a> {
impl<'a, 'b> ReplyBuilder<'a, 'b> {
#[zenoh_macros::unstable]
pub fn with_attachment(mut self, attachment: Attachment) -> Self {
self.attachment = Some(attachment);
Expand All @@ -292,16 +294,17 @@ impl<'a> ReplyBuilder<'a> {
}
}

impl<'a> Resolvable for ReplyBuilder<'a> {
impl<'a, 'b> Resolvable for ReplyBuilder<'a, 'b> {
type To = ZResult<()>;
}

impl SyncResolve for ReplyBuilder<'_> {
impl<'a, 'b> SyncResolve for ReplyBuilder<'a, 'b> {
fn res_sync(self) -> <Self as Resolvable>::To {
let key_expr = self.key_expr?;
if !self.query._accepts_any_replies().unwrap_or(false)
&& !self.query.key_expr().intersects(&self.key_expr)
&& !self.query.key_expr().intersects(&key_expr)
{
bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", self.key_expr, self.query.key_expr())
bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", &key_expr, self.query.key_expr())
}
#[allow(unused_mut)] // will be unused if feature = "unstable" is not enabled
let mut ext_sinfo = None;
Expand All @@ -318,7 +321,7 @@ impl SyncResolve for ReplyBuilder<'_> {
rid: self.query.inner.qid,
wire_expr: WireExpr {
scope: 0,
suffix: std::borrow::Cow::Owned(self.key_expr.into()),
suffix: std::borrow::Cow::Owned(key_expr.into()),
mapping: Mapping::Sender,
},
payload: ResponseBody::Reply(zenoh::Reply {
Expand Down Expand Up @@ -360,7 +363,7 @@ impl SyncResolve for ReplyBuilder<'_> {
}
}

impl<'a> AsyncResolve for ReplyBuilder<'a> {
impl<'a, 'b> AsyncResolve for ReplyBuilder<'a, 'b> {
type Future = Ready<Self::To>;

fn res_async(self) -> Self::Future {
Expand Down
2 changes: 1 addition & 1 deletion zenoh/tests/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl Task {
tokio::select! {
_ = token.cancelled() => break,
query = queryable.recv_async() => {
query?.reply(KeyExpr::try_from(ke.to_owned())?, payload.clone()).res_async().await?;
query?.reply(ke.to_owned(), payload.clone()).res_async().await?;
},
}
}
Expand Down
5 changes: 1 addition & 4 deletions zenoh/tests/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,7 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re
"ok_del" => {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
ztimeout!(query
.reply_del(KeyExpr::try_from(key_expr).unwrap())
.res_async())
.unwrap()
ztimeout!(query.reply_del(key_expr).res_async()).unwrap()
})
});
}
Expand Down

0 comments on commit 312c03a

Please sign in to comment.