Skip to content

Commit

Permalink
refactor Query.reply() (#796)
Browse files Browse the repository at this point in the history
* refactor Query.reply() into seprate methods:reply, reply_del and reply_err

* explain #[allow(unused_mut)];
replace unwrap  on KeyxExpr.try_from with ? as it was originally for Sample in zenoh/tests/routing.rs

* mark Query.reply_sample as unstable

* format fix
  • Loading branch information
DenisBiryukov91 authored Mar 12, 2024
1 parent 41e2557 commit f12f338
Show file tree
Hide file tree
Showing 19 changed files with 340 additions and 216 deletions.
19 changes: 11 additions & 8 deletions examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,28 @@ async fn main() {
println!(">> [Queryable ] Received Query '{}' with value '{}'", query.selector(), payload);
},
}
let reply = if send_errors.swap(false, Relaxed) {
if send_errors.swap(false, Relaxed) {
println!(
">> [Queryable ] Replying (ERROR: '{}')",
value,
);
Err(value.clone().into())
query
.reply_err(value.clone())
.res()
.await
.unwrap_or_else(|e| println!(">> [Queryable ] Error sending reply: {e}"));
} else {
println!(
">> [Queryable ] Responding ('{}': '{}')",
key_expr.as_str(),
value,
);
Ok(Sample::new(key_expr.clone(), value.clone()))
query
.reply(key_expr.clone(), value.clone())
.res()
.await
.unwrap_or_else(|e| println!(">> [Queryable ] Error sending reply: {e}"));
};
query
.reply(reply)
.res()
.await
.unwrap_or_else(|e| println!(">> [Queryable ] Error sending reply: {e}"));
},

_ = stdin.read_exact(&mut input).fuse() => {
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn main() {
println!(">> [Queryable ] Received Query '{}'", query.selector());
for (stored_name, sample) in stored.iter() {
if query.selector().key_expr.intersects(unsafe {keyexpr::from_str_unchecked(stored_name)}) {
query.reply(Ok(sample.clone())).res().await.unwrap();
query.reply(sample.key_expr.clone(), sample.payload.clone()).res().await.unwrap();
}
}
},
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-backend-traits/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async-std = { workspace = true, features = ["default"] }
async-trait = { workspace = true }
derive_more = { workspace = true }
serde_json = { workspace = true }
zenoh = { workspace = true }
zenoh = { workspace = true, features = ["unstable"] }
zenoh-result = { workspace = true }
zenoh-util = { workspace = true }
schemars = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-backend-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,6 @@ impl Query {
sample
};
// Send reply
self.q.reply(Ok(sample))
self.q.reply_sample(sample)
}
}
2 changes: 1 addition & 1 deletion plugins/zenoh-plugin-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async fn run(runtime: Runtime, selector: KeyExpr<'_>, flag: Arc<AtomicBool>) {
info!("Handling query '{}'", query.selector());
for (key_expr, sample) in stored.iter() {
if query.selector().key_expr.intersects(unsafe{keyexpr::from_str_unchecked(key_expr)}) {
query.reply(Ok(sample.clone())).res().await.unwrap();
query.reply_sample(sample.clone()).res().await.unwrap();
}
}
}
Expand Down
6 changes: 1 addition & 5 deletions plugins/zenoh-plugin-rest/examples/z_serve_sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ async fn main() {
let receiver = queryable.receiver.clone();
async move {
while let Ok(request) = receiver.recv_async().await {
request
.reply(Ok(Sample::new(key, HTML)))
.res()
.await
.unwrap();
request.reply(key, HTML).res().await.unwrap();
}
}
});
Expand Down
53 changes: 31 additions & 22 deletions plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,34 +95,43 @@ impl AlignQueryable {
for value in values {
match value {
AlignData::Interval(i, c) => {
let sample = Sample::new(
query.key_expr().clone(),
serde_json::to_string(&(i, c)).unwrap(),
);
query.reply(Ok(sample)).res().await.unwrap();
query
.reply(
query.key_expr().clone(),
serde_json::to_string(&(i, c)).unwrap(),
)
.res()
.await
.unwrap();
}
AlignData::Subinterval(i, c) => {
let sample = Sample::new(
query.key_expr().clone(),
serde_json::to_string(&(i, c)).unwrap(),
);
query.reply(Ok(sample)).res().await.unwrap();
query
.reply(
query.key_expr().clone(),
serde_json::to_string(&(i, c)).unwrap(),
)
.res()
.await
.unwrap();
}
AlignData::Content(i, c) => {
let sample = Sample::new(
query.key_expr().clone(),
serde_json::to_string(&(i, c)).unwrap(),
);
query.reply(Ok(sample)).res().await.unwrap();
query
.reply(
query.key_expr().clone(),
serde_json::to_string(&(i, c)).unwrap(),
)
.res()
.await
.unwrap();
}
AlignData::Data(k, (v, ts)) => {
let Value {
payload, encoding, ..
} = v;
let sample = Sample::new(k, payload)
.with_encoding(encoding)
.with_timestamp(ts);
query.reply(Ok(sample)).res().await.unwrap();
query
.reply(k, v.payload)
.with_encoding(v.encoding)
.with_timestamp(ts)
.res()
.await
.unwrap();
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ impl StorageService {
} else {
sample
};
if let Err(e) = q.reply(Ok(sample)).res().await {
if let Err(e) = q.reply_sample(sample).res().await {
log::warn!(
"Storage '{}' raised an error replying a query: {}",
self.name,
Expand Down Expand Up @@ -570,7 +570,7 @@ impl StorageService {
} else {
sample
};
if let Err(e) = q.reply(Ok(sample)).res().await {
if let Err(e) = q.reply_sample(sample).res().await {
log::warn!(
"Storage '{}' raised an error replying a query: {}",
self.name,
Expand All @@ -583,7 +583,7 @@ impl StorageService {
let err_message =
format!("Storage '{}' raised an error on query: {}", self.name, e);
log::warn!("{}", err_message);
if let Err(e) = q.reply(Err(err_message.into())).res().await {
if let Err(e) = q.reply_err(err_message).res().await {
log::warn!(
"Storage '{}' raised an error replying a query: {}",
self.name,
Expand Down
6 changes: 1 addition & 5 deletions zenoh-ext/src/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,7 @@ async fn query_handler(z: Arc<Session>, state: Arc<GroupState>) {

while let Ok(query) = queryable.recv_async().await {
log::trace!("Serving query for: {}", &qres);
query
.reply(Ok(Sample::new(qres.clone(), buf.clone())))
.res()
.await
.unwrap();
query.reply(qres.clone(), buf.clone()).res().await.unwrap();
}
}

Expand Down
6 changes: 3 additions & 3 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl<'a> PublicationCache<'a> {
}
},

// on query, reply with cach content
// on query, reply with cache content
query = quer_recv.recv_async() => {
if let Ok(query) = query {
if !query.selector().key_expr.as_str().contains('*') {
Expand All @@ -212,7 +212,7 @@ impl<'a> PublicationCache<'a> {
continue;
}
}
if let Err(e) = query.reply(Ok(sample.clone())).res_async().await {
if let Err(e) = query.reply_sample(sample.clone()).res_async().await {
log::warn!("Error replying to query: {}", e);
}
}
Expand All @@ -226,7 +226,7 @@ impl<'a> PublicationCache<'a> {
continue;
}
}
if let Err(e) = query.reply(Ok(sample.clone())).res_async().await {
if let Err(e) = query.reply_sample(sample.clone()).res_async().await {
log::warn!("Error replying to query: {}", e);
}
}
Expand Down
6 changes: 3 additions & 3 deletions zenoh/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
prelude::sync::{KeyExpr, Locality, SampleKind},
queryable::Query,
sample::DataInfo,
Payload, Sample, Session, ZResult,
Payload, Session, ZResult,
};
use async_std::task;
use std::{
Expand Down Expand Up @@ -71,7 +71,7 @@ pub(crate) fn on_admin_query(session: &Session, query: Query) {
if let Ok(value) = serde_json::value::to_value(peer.clone()) {
match Payload::try_from(value) {
Ok(zbuf) => {
let _ = query.reply(Ok(Sample::new(key_expr, zbuf))).res_sync();
let _ = query.reply(key_expr, zbuf).res_sync();
}
Err(e) => log::debug!("Admin query error: {}", e),
}
Expand All @@ -88,7 +88,7 @@ pub(crate) fn on_admin_query(session: &Session, query: Query) {
if let Ok(value) = serde_json::value::to_value(link) {
match Payload::try_from(value) {
Ok(zbuf) => {
let _ = query.reply(Ok(Sample::new(key_expr, zbuf))).res_sync();
let _ = query.reply(key_expr, zbuf).res_sync();
}
Err(e) => log::debug!("Admin query error: {}", e),
}
Expand Down
30 changes: 11 additions & 19 deletions zenoh/src/net/runtime/adminspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::key_expr::KeyExpr;
use crate::net::primitives::Primitives;
use crate::payload::Payload;
use crate::plugins::sealed::{self as plugins};
use crate::prelude::sync::{Sample, SyncResolve};
use crate::prelude::sync::SyncResolve;
use crate::queryable::Query;
use crate::queryable::QueryInner;
use crate::value::Value;
Expand Down Expand Up @@ -577,9 +577,8 @@ fn router_data(context: &AdminContext, query: Query) {
}
};
if let Err(e) = query
.reply(Ok(
Sample::new(reply_key, payload).with_encoding(Encoding::APPLICATION_JSON)
))
.reply(reply_key, payload)
.with_encoding(Encoding::APPLICATION_JSON)
.res_sync()
{
log::error!("Error sending AdminSpace reply: {:?}", e);
Expand Down Expand Up @@ -609,7 +608,7 @@ zenoh_build{{version="{}"}} 1
.openmetrics_text(),
);

if let Err(e) = query.reply(Ok(Sample::new(reply_key, metrics))).res() {
if let Err(e) = query.reply(reply_key, metrics).res() {
log::error!("Error sending AdminSpace reply: {:?}", e);
}
}
Expand All @@ -622,10 +621,7 @@ fn routers_linkstate_data(context: &AdminContext, query: Query) {
let tables = zread!(context.runtime.state.router.tables.tables);

if let Err(e) = query
.reply(Ok(Sample::new(
reply_key,
tables.hat_code.info(&tables, WhatAmI::Router),
)))
.reply(reply_key, tables.hat_code.info(&tables, WhatAmI::Router))
.res()
{
log::error!("Error sending AdminSpace reply: {:?}", e);
Expand All @@ -640,10 +636,7 @@ fn peers_linkstate_data(context: &AdminContext, query: Query) {
let tables = zread!(context.runtime.state.router.tables.tables);

if let Err(e) = query
.reply(Ok(Sample::new(
reply_key,
tables.hat_code.info(&tables, WhatAmI::Peer),
)))
.reply(reply_key, tables.hat_code.info(&tables, WhatAmI::Peer))
.res()
{
log::error!("Error sending AdminSpace reply: {:?}", e);
Expand All @@ -660,7 +653,7 @@ fn subscribers_data(context: &AdminContext, query: Query) {
))
.unwrap();
if query.key_expr().intersects(&key) {
if let Err(e) = query.reply(Ok(Sample::new(key, Payload::empty()))).res() {
if let Err(e) = query.reply(key, Payload::empty()).res() {
log::error!("Error sending AdminSpace reply: {:?}", e);
}
}
Expand All @@ -677,7 +670,7 @@ fn queryables_data(context: &AdminContext, query: Query) {
))
.unwrap();
if query.key_expr().intersects(&key) {
if let Err(e) = query.reply(Ok(Sample::new(key, Payload::empty()))).res() {
if let Err(e) = query.reply(key, Payload::empty()).res() {
log::error!("Error sending AdminSpace reply: {:?}", e);
}
}
Expand All @@ -697,7 +690,7 @@ fn plugins_data(context: &AdminContext, query: Query) {
let status = serde_json::to_value(status).unwrap();
match Payload::try_from(status) {
Ok(zbuf) => {
if let Err(e) = query.reply(Ok(Sample::new(key, zbuf))).res_sync() {
if let Err(e) = query.reply(key, zbuf).res_sync() {
log::error!("Error sending AdminSpace reply: {:?}", e);
}
}
Expand All @@ -718,8 +711,7 @@ fn plugins_status(context: &AdminContext, query: Query) {
with_extended_string(plugin_key, &["/__path__"], |plugin_path_key| {
if let Ok(key_expr) = KeyExpr::try_from(plugin_path_key.clone()) {
if query.key_expr().intersects(&key_expr) {
if let Err(e) = query.reply(Ok(Sample::new(key_expr, plugin.path()))).res()
{
if let Err(e) = query.reply(key_expr, plugin.path()).res() {
log::error!("Error sending AdminSpace reply: {:?}", e);
}
}
Expand All @@ -743,7 +735,7 @@ fn plugins_status(context: &AdminContext, query: Query) {
if let Ok(key_expr) = KeyExpr::try_from(response.key) {
match Payload::try_from(response.value) {
Ok(zbuf) => {
if let Err(e) = query.reply(Ok(Sample::new(key_expr, zbuf))).res_sync() {
if let Err(e) = query.reply(key_expr, zbuf).res_sync() {
log::error!("Error sending AdminSpace reply: {:?}", e);
}
},
Expand Down
Loading

0 comments on commit f12f338

Please sign in to comment.