Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce open time #971

Merged
merged 54 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
1eb8629
Router implements interests protocol for clients
OlivierHecart Mar 11, 2024
16f7789
Send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients fo…
OlivierHecart Mar 11, 2024
cf1f579
Fix WireExprExt M flag encoding/decoding
OlivierHecart Mar 12, 2024
6047d75
Fix decl_key
OlivierHecart Mar 15, 2024
5de298f
Clients send all samples and queries to routers and peers
OlivierHecart Mar 15, 2024
961bec7
Avoid self declaration loop on interest
OlivierHecart Mar 19, 2024
eb976c8
Fix query/replies copy/paste bugs
OlivierHecart Mar 21, 2024
b8f1a9c
Peers implement interests protocol for clients
OlivierHecart Mar 21, 2024
83a51e4
Merge branch 'protocol_changes' into interests
OlivierHecart Mar 21, 2024
26bbd8e
Don't send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients
OlivierHecart Mar 22, 2024
cede672
Add client writer-side filtering (#863)
OlivierHecart Mar 27, 2024
76fb3ed
Merge branch 'protocol_changes' into interests
OlivierHecart Mar 27, 2024
df2ea58
Fix pubsub interest based routing after router failover
OlivierHecart Mar 27, 2024
41f59d3
Declare message can be Push/Request/RequestContinuous/Response
Mallets Apr 4, 2024
43a61c7
Address review comments
Mallets Apr 4, 2024
bce8855
Remove F: Future flag from DeclareInterest
Mallets Apr 5, 2024
3da2aed
cargo fmt --all
Mallets Apr 5, 2024
c753e82
Remove unused Interest flags field
OlivierHecart Apr 5, 2024
52ff7d0
Update doc
OlivierHecart Apr 5, 2024
8c9abc1
Remove unneeded interest_id field
OlivierHecart Apr 5, 2024
3a4161b
Merge branch 'protocol_changes' into protocol_declare
OlivierHecart Apr 5, 2024
9aa2079
Update commons/zenoh-protocol/src/network/declare.rs
Mallets Apr 5, 2024
dd2ef80
Remove unused UndeclareInterest
OlivierHecart Apr 5, 2024
b6dc311
Merge branch 'protocol_changes' into protocol_declare
OlivierHecart Apr 5, 2024
83d781f
Merge branch 'protocol_changes' into interests
OlivierHecart Apr 5, 2024
7f55917
Implement proper Declare Request/Response id correlation
OlivierHecart Apr 5, 2024
62192b9
Add new Interest network message
OlivierHecart Apr 8, 2024
3ebad65
Merge branch 'protocol_changes' into protocol_declare
OlivierHecart Apr 8, 2024
e3a8eb2
Update doc
OlivierHecart Apr 8, 2024
a80ce2b
Update codec
OlivierHecart Apr 8, 2024
bac3acb
Merge branch 'protocol_declare' into interests
OlivierHecart Apr 18, 2024
4e0ccae
Merge branch 'protocol_changes' into interests
OlivierHecart Apr 18, 2024
d8ba33c
Merge branch 'protocol_changes' into interests
OlivierHecart Apr 18, 2024
59ae98f
Fix stable build
OlivierHecart Apr 18, 2024
6a9c4f7
Fix test_acl
OlivierHecart Apr 18, 2024
9a2a539
Fix writer side filtering
OlivierHecart Apr 23, 2024
d4dcf14
Merge branch 'protocol_changes' into interests
OlivierHecart Apr 26, 2024
c6e8b53
Merge branch 'protocol_changes' into interests
OlivierHecart Apr 26, 2024
0d5df18
Add separate functions to compute matching status
OlivierHecart May 2, 2024
1141291
Merge branch 'protocol_changes' into interests
OlivierHecart May 3, 2024
b43b159
Fix unstable imports
OlivierHecart May 3, 2024
0eb4e98
Remove useless checks
OlivierHecart May 3, 2024
e03d239
Don't apply open session delay in client mode
OlivierHecart Mar 27, 2024
eebefaa
Add open_delay test
OlivierHecart Apr 18, 2024
44180c2
Peers don't apply writer side filtering until FinalInterest is received
OlivierHecart Apr 23, 2024
f794403
Don't wait for full scouting delay when peers connected all configure…
OlivierHecart Apr 23, 2024
30044a9
Increase scouting delay and decrease api open delay
OlivierHecart Apr 23, 2024
6ca1e47
Wait for gossip and related connections attempts before returning to …
OlivierHecart May 2, 2024
33b5ebe
Remove random backoff for p2p
OlivierHecart May 2, 2024
c608f18
Fix memory leak
OlivierHecart May 2, 2024
4df89b9
Remove API_OPEN_DELAY
OlivierHecart May 6, 2024
912bd0c
Merge branch 'protocol_changes' into open_delay
OlivierHecart May 14, 2024
ad0362c
Don't apply any scouting delay when multicast disabled and no configu…
OlivierHecart May 14, 2024
70b9cb7
Sleep for scouting/delay in router and linkstate peer modes
OlivierHecart May 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub const mode: WhatAmI = WhatAmI::Peer;
#[allow(dead_code)]
pub mod scouting {
pub const timeout: u64 = 3000;
pub const delay: u64 = 200;
pub const delay: u64 = 500;
pub mod multicast {
pub const enabled: bool = true;
pub const address: ([u8; 4], u16) = ([224, 0, 0, 224], 7446);
Expand Down
3 changes: 3 additions & 0 deletions zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ zenoh-util = { workspace = true }
zenoh-runtime = { workspace = true }
zenoh-task = { workspace = true }

[dev-dependencies]
tokio = { workspace = true }

[build-dependencies]
rustc_version = { workspace = true }

Expand Down
3 changes: 0 additions & 3 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ zconfigurable! {
pub(crate) static ref API_QUERY_RECEPTION_CHANNEL_SIZE: usize = 256;
pub(crate) static ref API_REPLY_EMISSION_CHANNEL_SIZE: usize = 256;
pub(crate) static ref API_REPLY_RECEPTION_CHANNEL_SIZE: usize = 256;
pub(crate) static ref API_OPEN_SESSION_DELAY: u64 = 500;
}

pub(crate) struct SessionState {
Expand Down Expand Up @@ -865,8 +864,6 @@ impl Session {
.await;
session.owns_runtime = true;
runtime.start().await?;
// Workaround for the declare_and_shoot problem
tokio::time::sleep(Duration::from_millis(*API_OPEN_SESSION_DELAY)).await;
Ok(session)
})
}
Expand Down
5 changes: 5 additions & 0 deletions zenoh/src/net/routing/hat/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ impl HatBaseTrait for HatCode {
fn close_face(&self, tables: &TablesLock, face: &mut Arc<FaceState>) {
let mut wtables = zwrite!(tables.tables);
let mut face_clone = face.clone();

face_hat_mut!(face).remote_sub_interests.clear();
face_hat_mut!(face).local_subs.clear();
face_hat_mut!(face).local_qabls.clear();

let face = get_mut_unchecked(face);
for res in face.remote_mappings.values_mut() {
get_mut_unchecked(res).session_ctxs.remove(&face.id);
Expand Down
6 changes: 6 additions & 0 deletions zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ impl HatBaseTrait for HatCode {
fn close_face(&self, tables: &TablesLock, face: &mut Arc<FaceState>) {
let mut wtables = zwrite!(tables.tables);
let mut face_clone = face.clone();

face_hat_mut!(face).remote_sub_interests.clear();
face_hat_mut!(face).local_subs.clear();
face_hat_mut!(face).remote_qabl_interests.clear();
face_hat_mut!(face).local_qabls.clear();

let face = get_mut_unchecked(face);
for res in face.remote_mappings.values_mut() {
get_mut_unchecked(res).session_ctxs.remove(&face.id);
Expand Down
20 changes: 14 additions & 6 deletions zenoh/src/net/routing/hat/p2p_peer/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use std::convert::TryInto;

use petgraph::graph::NodeIndex;
use rand::Rng;
use vec_map::VecMap;
use zenoh_buffers::{
writer::{DidntWrite, HasWriter},
Expand Down Expand Up @@ -399,6 +398,11 @@ impl Network {

if self.gossip {
if let Some(idx) = idx {
zenoh_runtime::ZRuntime::Net.block_in_place(
strong_runtime
.start_conditions()
.add_peer_connector_zid(zid),
);
if self.gossip_multihop || self.links.values().any(|link| link.zid == zid) {
self.send_on_links(
vec![(
Expand All @@ -424,19 +428,23 @@ impl Network {
.await
.is_none()
{
// random backoff
let sleep_time = std::time::Duration::from_millis(
rand::thread_rng().gen_range(0..100),
);
tokio::time::sleep(sleep_time).await;
runtime.connect_peer(&zid, &locators).await;
runtime
.start_conditions()
.terminate_peer_connector_zid(zid)
.await;
}
});
}
}
}
}
}
zenoh_runtime::ZRuntime::Net.block_in_place(
strong_runtime
.start_conditions()
.terminate_peer_connector_zid(src),
);
}

pub(super) fn add_link(&mut self, transport: TransportUnicast) -> usize {
Expand Down
42 changes: 38 additions & 4 deletions zenoh/src/net/routing/hat/p2p_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ use zenoh_config::{unwrap_or_default, ModeDependent, WhatAmI, WhatAmIMatcher};
use zenoh_protocol::{
common::ZExtBody,
network::{
declare::{queryable::ext::QueryableInfoType, QueryableId, SubscriberId},
interest::InterestId,
declare::{
ext::{NodeIdType, QoSType},
queryable::ext::QueryableInfoType,
QueryableId, SubscriberId,
},
interest::{InterestId, InterestOptions},
oam::id::OAM_LINKSTATE,
Oam,
Declare, DeclareBody, DeclareFinal, Oam,
},
};
use zenoh_result::ZResult;
Expand All @@ -53,8 +57,9 @@ use crate::net::{
codec::Zenoh080Routing,
protocol::linkstate::LinkStateList,
routing::{
dispatcher::face::Face,
dispatcher::face::{Face, InterestState},
router::{compute_data_routes, compute_query_routes, RoutesIndexes},
RoutingContext,
},
runtime::Runtime,
};
Expand Down Expand Up @@ -157,14 +162,43 @@ impl HatBaseTrait for HatCode {
net.add_link(transport.clone());
}
}
if face.state.whatami == WhatAmI::Peer {
get_mut_unchecked(&mut face.state).local_interests.insert(
0,
InterestState {
options: InterestOptions::ALL,
res: None,
finalized: false,
},
);
}

pubsub_new_face(tables, &mut face.state);
queries_new_face(tables, &mut face.state);

if face.state.whatami == WhatAmI::Peer {
face.state
.primitives
.send_declare(RoutingContext::new(Declare {
interest_id: Some(0),
ext_qos: QoSType::default(),
ext_tstamp: None,
ext_nodeid: NodeIdType::default(),
body: DeclareBody::DeclareFinal(DeclareFinal),
}));
}
Ok(())
}

fn close_face(&self, tables: &TablesLock, face: &mut Arc<FaceState>) {
let mut wtables = zwrite!(tables.tables);
let mut face_clone = face.clone();

face_hat_mut!(face).remote_sub_interests.clear();
face_hat_mut!(face).local_subs.clear();
face_hat_mut!(face).remote_qabl_interests.clear();
face_hat_mut!(face).local_qabls.clear();

let face = get_mut_unchecked(face);
for res in face.remote_mappings.values_mut() {
get_mut_unchecked(res).session_ctxs.remove(&face.id);
Expand Down
21 changes: 20 additions & 1 deletion zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{
tables::{Route, RoutingExpr, Tables},
},
hat::{CurrentFutureTrait, HatPubSubTrait, Sources},
router::RoutesIndexes,
router::{update_data_routes_from, RoutesIndexes},
RoutingContext, PREFIX_LIVELINESS,
},
};
Expand Down Expand Up @@ -358,6 +358,10 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc<FaceState>) {
}
}
}
// recompute routes
// TODO: disable data routes and recompute them in parallel to avoid holding
// tables write lock for a long time on peer conenction.
update_data_routes_from(tables, &mut tables.root_res.clone());
}

impl HatPubSubTrait for HatCode {
Expand Down Expand Up @@ -565,6 +569,21 @@ impl HatPubSubTrait for HatCode {
return Arc::new(route);
}
};

for face in tables.faces.values().filter(|f| {
f.whatami == WhatAmI::Peer
&& !f
.local_interests
.get(&0)
.map(|i| i.finalized)
.unwrap_or(true)
}) {
route.entry(face.id).or_insert_with(|| {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
(face.clone(), key_expr.to_owned(), NodeId::default())
});
}

let res = Resource::get_resource(expr.prefix, expr.suffix);
let matches = res
.as_ref()
Expand Down
23 changes: 22 additions & 1 deletion zenoh/src/net/routing/hat/p2p_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::net::routing::{
tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables},
},
hat::{CurrentFutureTrait, HatQueriesTrait, Sources},
router::RoutesIndexes,
router::{update_query_routes_from, RoutesIndexes},
RoutingContext, PREFIX_LIVELINESS,
};

Expand Down Expand Up @@ -332,6 +332,10 @@ pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc<FaceState>) {
}
}
}
// recompute routes
// TODO: disable query routes and recompute them in parallel to avoid holding
// tables write lock for a long time on peer conenction.
update_query_routes_from(tables, &mut tables.root_res.clone());
}

lazy_static::lazy_static! {
Expand Down Expand Up @@ -549,6 +553,23 @@ impl HatQueriesTrait for HatCode {
return EMPTY_ROUTE.clone();
}
};

for face in tables.faces.values().filter(|f| {
f.whatami == WhatAmI::Peer
&& !f
.local_interests
.get(&0)
.map(|i| i.finalized)
.unwrap_or(true)
}) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), NodeId::default()),
complete: 0,
distance: 0.5,
});
}

let res = Resource::get_resource(expr.prefix, expr.suffix);
let matches = res
.as_ref()
Expand Down
6 changes: 6 additions & 0 deletions zenoh/src/net/routing/hat/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,12 @@ impl HatBaseTrait for HatCode {
fn close_face(&self, tables: &TablesLock, face: &mut Arc<FaceState>) {
let mut wtables = zwrite!(tables.tables);
let mut face_clone = face.clone();

face_hat_mut!(face).remote_sub_interests.clear();
face_hat_mut!(face).local_subs.clear();
face_hat_mut!(face).remote_qabl_interests.clear();
face_hat_mut!(face).local_qabls.clear();

let face = get_mut_unchecked(face);
for res in face.remote_mappings.values_mut() {
get_mut_unchecked(res).session_ctxs.remove(&face.id);
Expand Down
7 changes: 7 additions & 0 deletions zenoh/src/net/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use zenoh_transport::{
TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler,
};

use self::orchestrator::StartConditions;
use super::{primitives::DeMux, routing, routing::router::Router};
#[cfg(all(feature = "unstable", feature = "plugins"))]
use crate::api::loader::{load_plugins, start_plugins};
Expand All @@ -78,6 +79,7 @@ pub(crate) struct RuntimeState {
task_controller: TaskController,
#[cfg(all(feature = "unstable", feature = "plugins"))]
plugins_manager: Mutex<PluginsManager>,
start_conditions: Arc<StartConditions>,
}

pub struct WeakRuntime {
Expand Down Expand Up @@ -186,6 +188,7 @@ impl RuntimeBuilder {
task_controller: TaskController::default(),
#[cfg(all(feature = "unstable", feature = "plugins"))]
plugins_manager: Mutex::new(plugins_manager),
start_conditions: Arc::new(StartConditions::default()),
}),
};
*handler.runtime.write().unwrap() = Runtime::downgrade(&runtime);
Expand Down Expand Up @@ -354,6 +357,10 @@ impl Runtime {
pub fn get_cancellation_token(&self) -> CancellationToken {
self.state.task_controller.get_cancellation_token()
}

pub(crate) fn start_conditions(&self) -> &Arc<StartConditions> {
&self.state.start_conditions
}
}

struct RuntimeTransportEventHandler {
Expand Down
Loading
Loading