Skip to content

Commit

Permalink
Reduce open time (#971)
Browse files Browse the repository at this point in the history
* Router implements interests protocol for clients

* Send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients for pico

* Fix WireExprExt M flag encoding/decoding

* Fix decl_key

* Clients send all samples and queries to routers and peers

* Avoid self declaration loop on interest

* Fix query/replies copy/paste bugs

* Peers implement interests protocol for clients

* Don't send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients

* Add client writer-side filtering (#863)

* Add client writer-side filtering

* Reimplement liveliness with interests

* Fix writer-side filtering before receiving FinalInterest

* Fix pubsub interest based routing after router failover

* Declare message can be Push/Request/RequestContinuous/Response

* Address review comments

* Remove F: Future flag from DeclareInterest

* cargo fmt --all

* Remove unused Interest flags field

* Update doc

* Remove unneeded interest_id field

* Update commons/zenoh-protocol/src/network/declare.rs

* Remove unused UndeclareInterest

* Implement proper Declare Request/Response id correlation

* Add new Interest network message

* Update doc

* Update codec

* Fix stable build

* Fix test_acl

* Fix writer side filtering

* Add separate functions to compute matching status

* Fix unstable imports

* Remove useless checks

* Don't apply open session delay in client mode

* Add open_delay test

* Peers don't apply writer side filtering until FinalInterest is received

* Don't wait for full scouting delay when peers connected all configured connect endpoints

* Increase scouting delay and decrease api open delay

* Wait for gossip and related connections attempts before returning to open

* Remove random backoff for p2p

* Fix memory leak

* Remove API_OPEN_DELAY

* Don't apply any scouting delay when multicast disabled and no configured connect/endpoints

* Sleep for scouting/delay in router and linkstate peer modes

---------

Co-authored-by: Luca Cominardi <[email protected]>
  • Loading branch information
OlivierHecart and Mallets authored May 15, 2024
1 parent c40250a commit ad6a974
Show file tree
Hide file tree
Showing 13 changed files with 657 additions and 26 deletions.
2 changes: 1 addition & 1 deletion commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub const mode: WhatAmI = WhatAmI::Peer;
#[allow(dead_code)]
pub mod scouting {
pub const timeout: u64 = 3000;
pub const delay: u64 = 200;
pub const delay: u64 = 500;
pub mod multicast {
pub const enabled: bool = true;
pub const address: ([u8; 4], u16) = ([224, 0, 0, 224], 7446);
Expand Down
3 changes: 3 additions & 0 deletions zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ zenoh-util = { workspace = true }
zenoh-runtime = { workspace = true }
zenoh-task = { workspace = true }

[dev-dependencies]
tokio = { workspace = true }

[build-dependencies]
rustc_version = { workspace = true }

Expand Down
3 changes: 0 additions & 3 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ zconfigurable! {
pub(crate) static ref API_QUERY_RECEPTION_CHANNEL_SIZE: usize = 256;
pub(crate) static ref API_REPLY_EMISSION_CHANNEL_SIZE: usize = 256;
pub(crate) static ref API_REPLY_RECEPTION_CHANNEL_SIZE: usize = 256;
pub(crate) static ref API_OPEN_SESSION_DELAY: u64 = 500;
}

pub(crate) struct SessionState {
Expand Down Expand Up @@ -865,8 +864,6 @@ impl Session {
.await;
session.owns_runtime = true;
runtime.start().await?;
// Workaround for the declare_and_shoot problem
tokio::time::sleep(Duration::from_millis(*API_OPEN_SESSION_DELAY)).await;
Ok(session)
})
}
Expand Down
5 changes: 5 additions & 0 deletions zenoh/src/net/routing/hat/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ impl HatBaseTrait for HatCode {
fn close_face(&self, tables: &TablesLock, face: &mut Arc<FaceState>) {
let mut wtables = zwrite!(tables.tables);
let mut face_clone = face.clone();

face_hat_mut!(face).remote_sub_interests.clear();
face_hat_mut!(face).local_subs.clear();
face_hat_mut!(face).local_qabls.clear();

let face = get_mut_unchecked(face);
for res in face.remote_mappings.values_mut() {
get_mut_unchecked(res).session_ctxs.remove(&face.id);
Expand Down
6 changes: 6 additions & 0 deletions zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ impl HatBaseTrait for HatCode {
fn close_face(&self, tables: &TablesLock, face: &mut Arc<FaceState>) {
let mut wtables = zwrite!(tables.tables);
let mut face_clone = face.clone();

face_hat_mut!(face).remote_sub_interests.clear();
face_hat_mut!(face).local_subs.clear();
face_hat_mut!(face).remote_qabl_interests.clear();
face_hat_mut!(face).local_qabls.clear();

let face = get_mut_unchecked(face);
for res in face.remote_mappings.values_mut() {
get_mut_unchecked(res).session_ctxs.remove(&face.id);
Expand Down
20 changes: 14 additions & 6 deletions zenoh/src/net/routing/hat/p2p_peer/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use std::convert::TryInto;

use petgraph::graph::NodeIndex;
use rand::Rng;
use vec_map::VecMap;
use zenoh_buffers::{
writer::{DidntWrite, HasWriter},
Expand Down Expand Up @@ -399,6 +398,11 @@ impl Network {

if self.gossip {
if let Some(idx) = idx {
zenoh_runtime::ZRuntime::Net.block_in_place(
strong_runtime
.start_conditions()
.add_peer_connector_zid(zid),
);
if self.gossip_multihop || self.links.values().any(|link| link.zid == zid) {
self.send_on_links(
vec![(
Expand All @@ -424,19 +428,23 @@ impl Network {
.await
.is_none()
{
// random backoff
let sleep_time = std::time::Duration::from_millis(
rand::thread_rng().gen_range(0..100),
);
tokio::time::sleep(sleep_time).await;
runtime.connect_peer(&zid, &locators).await;
runtime
.start_conditions()
.terminate_peer_connector_zid(zid)
.await;
}
});
}
}
}
}
}
zenoh_runtime::ZRuntime::Net.block_in_place(
strong_runtime
.start_conditions()
.terminate_peer_connector_zid(src),
);
}

pub(super) fn add_link(&mut self, transport: TransportUnicast) -> usize {
Expand Down
42 changes: 38 additions & 4 deletions zenoh/src/net/routing/hat/p2p_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ use zenoh_config::{unwrap_or_default, ModeDependent, WhatAmI, WhatAmIMatcher};
use zenoh_protocol::{
common::ZExtBody,
network::{
declare::{queryable::ext::QueryableInfoType, QueryableId, SubscriberId},
interest::InterestId,
declare::{
ext::{NodeIdType, QoSType},
queryable::ext::QueryableInfoType,
QueryableId, SubscriberId,
},
interest::{InterestId, InterestOptions},
oam::id::OAM_LINKSTATE,
Oam,
Declare, DeclareBody, DeclareFinal, Oam,
},
};
use zenoh_result::ZResult;
Expand All @@ -53,8 +57,9 @@ use crate::net::{
codec::Zenoh080Routing,
protocol::linkstate::LinkStateList,
routing::{
dispatcher::face::Face,
dispatcher::face::{Face, InterestState},
router::{compute_data_routes, compute_query_routes, RoutesIndexes},
RoutingContext,
},
runtime::Runtime,
};
Expand Down Expand Up @@ -157,14 +162,43 @@ impl HatBaseTrait for HatCode {
net.add_link(transport.clone());
}
}
if face.state.whatami == WhatAmI::Peer {
get_mut_unchecked(&mut face.state).local_interests.insert(
0,
InterestState {
options: InterestOptions::ALL,
res: None,
finalized: false,
},
);
}

pubsub_new_face(tables, &mut face.state);
queries_new_face(tables, &mut face.state);

if face.state.whatami == WhatAmI::Peer {
face.state
.primitives
.send_declare(RoutingContext::new(Declare {
interest_id: Some(0),
ext_qos: QoSType::default(),
ext_tstamp: None,
ext_nodeid: NodeIdType::default(),
body: DeclareBody::DeclareFinal(DeclareFinal),
}));
}
Ok(())
}

fn close_face(&self, tables: &TablesLock, face: &mut Arc<FaceState>) {
let mut wtables = zwrite!(tables.tables);
let mut face_clone = face.clone();

face_hat_mut!(face).remote_sub_interests.clear();
face_hat_mut!(face).local_subs.clear();
face_hat_mut!(face).remote_qabl_interests.clear();
face_hat_mut!(face).local_qabls.clear();

let face = get_mut_unchecked(face);
for res in face.remote_mappings.values_mut() {
get_mut_unchecked(res).session_ctxs.remove(&face.id);
Expand Down
21 changes: 20 additions & 1 deletion zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{
tables::{Route, RoutingExpr, Tables},
},
hat::{CurrentFutureTrait, HatPubSubTrait, Sources},
router::RoutesIndexes,
router::{update_data_routes_from, RoutesIndexes},
RoutingContext, PREFIX_LIVELINESS,
},
};
Expand Down Expand Up @@ -358,6 +358,10 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc<FaceState>) {
}
}
}
// recompute routes
// TODO: disable data routes and recompute them in parallel to avoid holding
// tables write lock for a long time on peer conenction.
update_data_routes_from(tables, &mut tables.root_res.clone());
}

impl HatPubSubTrait for HatCode {
Expand Down Expand Up @@ -565,6 +569,21 @@ impl HatPubSubTrait for HatCode {
return Arc::new(route);
}
};

for face in tables.faces.values().filter(|f| {
f.whatami == WhatAmI::Peer
&& !f
.local_interests
.get(&0)
.map(|i| i.finalized)
.unwrap_or(true)
}) {
route.entry(face.id).or_insert_with(|| {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
(face.clone(), key_expr.to_owned(), NodeId::default())
});
}

let res = Resource::get_resource(expr.prefix, expr.suffix);
let matches = res
.as_ref()
Expand Down
23 changes: 22 additions & 1 deletion zenoh/src/net/routing/hat/p2p_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::net::routing::{
tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables},
},
hat::{CurrentFutureTrait, HatQueriesTrait, Sources},
router::RoutesIndexes,
router::{update_query_routes_from, RoutesIndexes},
RoutingContext, PREFIX_LIVELINESS,
};

Expand Down Expand Up @@ -332,6 +332,10 @@ pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc<FaceState>) {
}
}
}
// recompute routes
// TODO: disable query routes and recompute them in parallel to avoid holding
// tables write lock for a long time on peer conenction.
update_query_routes_from(tables, &mut tables.root_res.clone());
}

lazy_static::lazy_static! {
Expand Down Expand Up @@ -549,6 +553,23 @@ impl HatQueriesTrait for HatCode {
return EMPTY_ROUTE.clone();
}
};

for face in tables.faces.values().filter(|f| {
f.whatami == WhatAmI::Peer
&& !f
.local_interests
.get(&0)
.map(|i| i.finalized)
.unwrap_or(true)
}) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), NodeId::default()),
complete: 0,
distance: 0.5,
});
}

let res = Resource::get_resource(expr.prefix, expr.suffix);
let matches = res
.as_ref()
Expand Down
6 changes: 6 additions & 0 deletions zenoh/src/net/routing/hat/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,12 @@ impl HatBaseTrait for HatCode {
fn close_face(&self, tables: &TablesLock, face: &mut Arc<FaceState>) {
let mut wtables = zwrite!(tables.tables);
let mut face_clone = face.clone();

face_hat_mut!(face).remote_sub_interests.clear();
face_hat_mut!(face).local_subs.clear();
face_hat_mut!(face).remote_qabl_interests.clear();
face_hat_mut!(face).local_qabls.clear();

let face = get_mut_unchecked(face);
for res in face.remote_mappings.values_mut() {
get_mut_unchecked(res).session_ctxs.remove(&face.id);
Expand Down
7 changes: 7 additions & 0 deletions zenoh/src/net/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use zenoh_transport::{
TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler,
};

use self::orchestrator::StartConditions;
use super::{primitives::DeMux, routing, routing::router::Router};
#[cfg(all(feature = "unstable", feature = "plugins"))]
use crate::api::loader::{load_plugins, start_plugins};
Expand All @@ -78,6 +79,7 @@ pub(crate) struct RuntimeState {
task_controller: TaskController,
#[cfg(all(feature = "unstable", feature = "plugins"))]
plugins_manager: Mutex<PluginsManager>,
start_conditions: Arc<StartConditions>,
}

pub struct WeakRuntime {
Expand Down Expand Up @@ -186,6 +188,7 @@ impl RuntimeBuilder {
task_controller: TaskController::default(),
#[cfg(all(feature = "unstable", feature = "plugins"))]
plugins_manager: Mutex::new(plugins_manager),
start_conditions: Arc::new(StartConditions::default()),
}),
};
*handler.runtime.write().unwrap() = Runtime::downgrade(&runtime);
Expand Down Expand Up @@ -354,6 +357,10 @@ impl Runtime {
pub fn get_cancellation_token(&self) -> CancellationToken {
self.state.task_controller.get_cancellation_token()
}

pub(crate) fn start_conditions(&self) -> &Arc<StartConditions> {
&self.state.start_conditions
}
}

struct RuntimeTransportEventHandler {
Expand Down
Loading

0 comments on commit ad6a974

Please sign in to comment.