Skip to content

Commit

Permalink
Enable queries timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Mar 21, 2024
1 parent 7fc04e7 commit dc8989b
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 27 deletions.
2 changes: 1 addition & 1 deletion io/zenoh-transport/tests/unicast_authenticator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ async fn run_with_lowlatency_transport(endpoint: &EndPoint) {
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn authenticator_tcp() {
let _ = env_logger::try_init();
let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 8000).parse().unwrap();
let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 8001).parse().unwrap();
run_with_universal_transport(&endpoint).await;
}

Expand Down
40 changes: 21 additions & 19 deletions zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::net::routing::RoutingContext;
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::time::Instant;
use zenoh_config::WhatAmI;
use zenoh_protocol::core::key_expr::keyexpr;
use zenoh_protocol::network::declare::queryable::ext::QueryableInfo;
Expand All @@ -33,7 +34,7 @@ use zenoh_protocol::{
zenoh::{reply::ext::ConsolidationType, Reply, RequestBody, ResponseBody},
};
use zenoh_sync::get_mut_unchecked;
use zenoh_util::Timed;
use zenoh_util::{Timed, TimedEvent};

pub(crate) struct Query {
src_face: Arc<FaceState>,
Expand Down Expand Up @@ -532,6 +533,9 @@ pub fn route_query(
.compute_local_replies(&rtables, &prefix, expr.suffix, face);
let zid = rtables.zid;

let timer = rtables.timer.clone();
let timeout = rtables.queries_default_timeout;

drop(queries_lock);
drop(rtables);

Expand Down Expand Up @@ -589,19 +593,17 @@ pub fn route_query(
expr.full_expr().to_string(),
));
} else {
// let timer = tables.timer.clone();
// let timeout = tables.queries_default_timeout;
#[cfg(feature = "complete_n")]
{
for ((outface, key_expr, context), qid, t) in route.values() {
// timer.add(TimedEvent::once(
// Instant::now() + timeout,
// QueryCleanup {
// tables: tables_ref.clone(),
// face: Arc::downgrade(&outface),
// *qid,
// },
// ));
timer.add(TimedEvent::once(
Instant::now() + timeout,
QueryCleanup {
tables: tables_ref.clone(),
face: Arc::downgrade(outface),
qid: *qid,
},
));
#[cfg(feature = "stats")]
if !admin {
inc_req_stats!(outface, tx, user, body)
Expand Down Expand Up @@ -630,14 +632,14 @@ pub fn route_query(
#[cfg(not(feature = "complete_n"))]
{
for ((outface, key_expr, context), qid) in route.values() {
// timer.add(TimedEvent::once(
// Instant::now() + timeout,
// QueryCleanup {
// tables: tables_ref.clone(),
// face: Arc::downgrade(&outface),
// *qid,
// },
// ));
timer.add(TimedEvent::once(
Instant::now() + timeout,
QueryCleanup {
tables: tables_ref.clone(),
face: Arc::downgrade(outface),
qid: *qid,
},
));
#[cfg(feature = "stats")]
if !admin {
inc_req_stats!(outface, tx, user, body)
Expand Down
15 changes: 8 additions & 7 deletions zenoh/src/net/routing/dispatcher/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::sync::{Mutex, RwLock};
use std::time::Duration;
use uhlc::HLC;
use zenoh_config::unwrap_or_default;
use zenoh_config::Config;
use zenoh_protocol::core::{ExprId, WhatAmI, ZenohId};
use zenoh_protocol::network::Mapping;
use zenoh_result::ZResult;
// use zenoh_collections::Timer;
use zenoh_sync::get_mut_unchecked;
use zenoh_util::Timer;

pub(crate) struct RoutingExpr<'a> {
pub(crate) prefix: &'a Arc<Resource>,
Expand Down Expand Up @@ -64,8 +65,8 @@ pub struct Tables {
#[allow(dead_code)]
pub(crate) hlc: Option<Arc<HLC>>,
pub(crate) drop_future_timestamp: bool,
// pub(crate) timer: Timer,
// pub(crate) queries_default_timeout: Duration,
pub(crate) timer: Timer,
pub(crate) queries_default_timeout: Duration,
pub(crate) root_res: Arc<Resource>,
pub(crate) faces: HashMap<usize, Arc<FaceState>>,
pub(crate) mcast_groups: Vec<Arc<FaceState>>,
Expand All @@ -87,17 +88,17 @@ impl Tables {
unwrap_or_default!(config.timestamping().drop_future_timestamp());
let router_peers_failover_brokering =
unwrap_or_default!(config.routing().router().peers_failover_brokering());
// let queries_default_timeout =
// Duration::from_millis(unwrap_or_default!(config.queries_default_timeout()));
let queries_default_timeout =
Duration::from_millis(unwrap_or_default!(config.queries_default_timeout()));
let hat_code = hat::new_hat(whatami, config);
Ok(Tables {
zid,
whatami,
face_counter: 0,
hlc,
drop_future_timestamp,
// timer: Timer::new(true),
// queries_default_timeout,
timer: Timer::new(true),
queries_default_timeout,
root_res: Resource::root(),
faces: HashMap::new(),
mcast_groups: vec![],
Expand Down

0 comments on commit dc8989b

Please sign in to comment.