Skip to content

Commit

Permalink
Remove closing from hat trait (#1469)
Browse files Browse the repository at this point in the history
* Remove closing from hat trait

* Also move orchestrator session closing code to closed phase

* Remove closing from TransportPeerEventHandler
  • Loading branch information
OlivierHecart authored Sep 27, 2024
1 parent 941f699 commit af3ac7b
Show file tree
Hide file tree
Showing 29 changed files with 166 additions and 304 deletions.
4 changes: 0 additions & 4 deletions io/zenoh-transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ impl TransportEventHandler for DummyTransportEventHandler {
/*************************************/
pub trait TransportMulticastEventHandler: Send + Sync {
fn new_peer(&self, peer: TransportPeer) -> ZResult<Arc<dyn TransportPeerEventHandler>>;
fn closing(&self);
fn closed(&self);
fn as_any(&self) -> &dyn Any;
}
Expand All @@ -95,7 +94,6 @@ impl TransportMulticastEventHandler for DummyTransportMulticastEventHandler {
fn new_peer(&self, _peer: TransportPeer) -> ZResult<Arc<dyn TransportPeerEventHandler>> {
Ok(Arc::new(DummyTransportPeerEventHandler))
}
fn closing(&self) {}
fn closed(&self) {}
fn as_any(&self) -> &dyn Any {
self
Expand All @@ -121,7 +119,6 @@ pub trait TransportPeerEventHandler: Send + Sync {
fn handle_message(&self, msg: NetworkMessage) -> ZResult<()>;
fn new_link(&self, src: Link);
fn del_link(&self, link: Link);
fn closing(&self);
fn closed(&self);
fn as_any(&self) -> &dyn Any;
}
Expand All @@ -137,7 +134,6 @@ impl TransportPeerEventHandler for DummyTransportPeerEventHandler {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
5 changes: 0 additions & 5 deletions io/zenoh-transport/src/multicast/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,7 @@ impl TransportMulticastInner {
pub(super) async fn delete(&self) -> ZResult<()> {
tracing::debug!("Closing multicast transport on {:?}", self.locator);

// Notify the callback that we are going to close the transport
let callback = zwrite!(self.callback).take();
if let Some(cb) = callback.as_ref() {
cb.closing();
}

// Delete the transport on the manager
let _ = self.manager.del_transport_multicast(&self.locator).await;
Expand Down Expand Up @@ -441,7 +437,6 @@ impl TransportMulticastInner {

// TODO(yuyuan): Unify the termination
peer.token.cancel();
peer.handler.closing();
drop(guard);
peer.handler.closed();
}
Expand Down
5 changes: 0 additions & 5 deletions io/zenoh-transport/src/unicast/lowlatency/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,7 @@ impl TransportUnicastLowlatency {
// to avoid concurrent new_transport and closing/closed notifications
let mut a_guard = self.get_alive().await;
*a_guard = false;

// Notify the callback that we are going to close the transport
let callback = zwrite!(self.callback).take();
if let Some(cb) = callback.as_ref() {
cb.closing();
}

// Delete the transport on the manager
let _ = self.manager.del_transport_unicast(&self.config.zid).await;
Expand Down
5 changes: 0 additions & 5 deletions io/zenoh-transport/src/unicast/universal/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,7 @@ impl TransportUnicastUniversal {
// to avoid concurrent new_transport and closing/closed notifications
let mut a_guard = self.get_alive().await;
*a_guard = false;

// Notify the callback that we are going to close the transport
let callback = zwrite!(self.callback).take();
if let Some(cb) = callback.as_ref() {
cb.closing();
}

// Delete the transport on the manager
let _ = self.manager.del_transport_unicast(&self.config.zid).await;
Expand Down
1 change: 0 additions & 1 deletion io/zenoh-transport/tests/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ impl TransportPeerEventHandler for SC {
}
fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
2 changes: 0 additions & 2 deletions io/zenoh-transport/tests/multicast_compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ mod tests {
count: self.count.clone(),
}))
}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand All @@ -127,7 +126,6 @@ mod tests {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
2 changes: 0 additions & 2 deletions io/zenoh-transport/tests/multicast_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ mod tests {
count: self.count.clone(),
}))
}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand All @@ -126,7 +125,6 @@ mod tests {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
1 change: 0 additions & 1 deletion io/zenoh-transport/tests/transport_whitelist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ impl TransportPeerEventHandler for SCRouter {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
1 change: 0 additions & 1 deletion io/zenoh-transport/tests/unicast_authenticator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ impl TransportPeerEventHandler for MHRouterAuthenticator {
}
fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
2 changes: 0 additions & 2 deletions io/zenoh-transport/tests/unicast_compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ mod tests {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -150,7 +149,6 @@ mod tests {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
1 change: 0 additions & 1 deletion io/zenoh-transport/tests/unicast_concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ impl TransportPeerEventHandler for MHPeer {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
1 change: 0 additions & 1 deletion io/zenoh-transport/tests/unicast_intermittent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ impl TransportPeerEventHandler for SCClient {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
2 changes: 0 additions & 2 deletions io/zenoh-transport/tests/unicast_priorities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ impl TransportPeerEventHandler for SCRouter {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -183,7 +182,6 @@ impl TransportPeerEventHandler for SCClient {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
1 change: 0 additions & 1 deletion io/zenoh-transport/tests/unicast_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ mod tests {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
1 change: 0 additions & 1 deletion io/zenoh-transport/tests/unicast_simultaneous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ mod tests {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
2 changes: 0 additions & 2 deletions io/zenoh-transport/tests/unicast_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ impl TransportPeerEventHandler for SCRouter {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -336,7 +335,6 @@ impl TransportPeerEventHandler for SCClient {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
4 changes: 0 additions & 4 deletions zenoh/src/api/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,6 @@ impl TransportMulticastEventHandler for Handler {
}
}

fn closing(&self) {}

fn closed(&self) {}

fn as_any(&self) -> &dyn std::any::Any {
Expand Down Expand Up @@ -250,8 +248,6 @@ impl TransportPeerEventHandler for PeerHandler {
);
}

fn closing(&self) {}

fn closed(&self) {
let info = DataInfo {
kind: SampleKind::Delete,
Expand Down
17 changes: 1 addition & 16 deletions zenoh/src/net/primitives/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,25 +100,10 @@ impl TransportPeerEventHandler for DeMux {

fn del_link(&self, _link: Link) {}

fn closing(&self) {
fn closed(&self) {
self.face.send_close();
if let Some(transport) = self.transport.as_ref() {
let mut declares = vec![];
let ctrl_lock = zlock!(self.face.tables.ctrl_lock);
let mut tables = zwrite!(self.face.tables.tables);
let _ = ctrl_lock.closing(&mut tables, &self.face.tables, transport, &mut |p, m| {
declares.push((p.clone(), m))
});
drop(tables);
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
}

fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
self
}
Expand Down
8 changes: 2 additions & 6 deletions zenoh/src/net/primitives/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,7 @@ impl Primitives for Mux {
}
}

fn send_close(&self) {
// self.handler.closing().await;
}
fn send_close(&self) {}
}

impl EPrimitives for Mux {
Expand Down Expand Up @@ -530,9 +528,7 @@ impl Primitives for McastMux {
}
}

fn send_close(&self) {
// self.handler.closing().await;
}
fn send_close(&self) {}
}

impl EPrimitives for McastMux {
Expand Down
28 changes: 25 additions & 3 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
collections::HashMap,
fmt,
sync::{Arc, Weak},
time::Duration,
};

use tokio_util::sync::CancellationToken;
Expand All @@ -37,13 +38,16 @@ use super::{
super::router::*,
interests::{declare_final, declare_interest, undeclare_interest, CurrentInterest},
resource::*,
tables::{self, TablesLock},
tables::TablesLock,
};
use crate::{
api::key_expr::KeyExpr,
net::{
primitives::{McastMux, Mux, Primitives},
routing::interceptor::{InterceptorTrait, InterceptorsChain},
routing::{
dispatcher::interests::finalize_pending_interests,
interceptor::{InterceptorTrait, InterceptorsChain},
},
},
};

Expand Down Expand Up @@ -421,7 +425,25 @@ impl Primitives for Face {
}

fn send_close(&self) {
tables::close_face(&self.tables, &Arc::downgrade(&self.state));
tracing::debug!("Close {}", self.state);
let mut state = self.state.clone();
state.task_controller.terminate_all(Duration::from_secs(10));
finalize_pending_queries(&self.tables, &mut state);
let mut declares = vec![];
let ctrl_lock = zlock!(self.tables.ctrl_lock);
finalize_pending_interests(&self.tables, &mut state, &mut |p, m| {
declares.push((p.clone(), m))
});
ctrl_lock.close_face(
&self.tables,
&self.tables.clone(),
&mut state,
&mut |p, m| declares.push((p.clone(), m)),
);
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
}

Expand Down
24 changes: 1 addition & 23 deletions zenoh/src/net/routing/dispatcher/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::{
any::Any,
collections::HashMap,
sync::{Arc, Mutex, RwLock, Weak},
sync::{Arc, Mutex, RwLock},
time::Duration,
};

Expand All @@ -30,7 +30,6 @@ use zenoh_sync::get_mut_unchecked;
use super::face::FaceState;
pub use super::{pubsub::*, queries::*, resource::*};
use crate::net::routing::{
dispatcher::interests::finalize_pending_interests,
hat::{self, HatTrait},
interceptor::{interceptor_factories, InterceptorFactory},
};
Expand Down Expand Up @@ -169,27 +168,6 @@ impl Tables {
}
}

pub fn close_face(tables: &TablesLock, face: &Weak<FaceState>) {
match face.upgrade() {
Some(mut face) => {
tracing::debug!("Close {}", face);
face.task_controller.terminate_all(Duration::from_secs(10));
finalize_pending_queries(tables, &mut face);
let mut declares = vec![];
let ctrl_lock = zlock!(tables.ctrl_lock);
finalize_pending_interests(tables, &mut face, &mut |p, m| {
declares.push((p.clone(), m))
});
ctrl_lock.close_face(tables, &mut face, &mut |p, m| declares.push((p.clone(), m)));
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
None => tracing::error!("Face already closed!"),
}
}

pub struct TablesLock {
pub tables: RwLock<Tables>,
pub(crate) ctrl_lock: Mutex<Box<dyn HatTrait + Send + Sync>>,
Expand Down
Loading

0 comments on commit af3ac7b

Please sign in to comment.