diff --git a/zenoh-ext/tests/liveliness.rs b/zenoh-ext/tests/liveliness.rs index 97dc817394..68d4b1b798 100644 --- a/zenoh-ext/tests/liveliness.rs +++ b/zenoh-ext/tests/liveliness.rs @@ -70,7 +70,7 @@ async fn test_liveliness_querying_subscriber_clique() { .unwrap(); tokio::time::sleep(SLEEP).await; - let _token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); + let token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); @@ -81,12 +81,18 @@ async fn test_liveliness_querying_subscriber_clique() { assert_eq!(sample.kind(), SampleKind::Put); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2); - drop(token1); + token1.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert_eq!(sample.kind(), SampleKind::Delete); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1); + + token2.undeclare().await.unwrap(); + sub.close().await.unwrap(); + + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -106,7 +112,7 @@ async fn test_liveliness_querying_subscriber_brokered() { zenoh_util::try_init_log_from_env(); - let _router = { + let router = { let mut c = config::default(); c.listen .endpoints @@ -168,7 +174,7 @@ async fn test_liveliness_querying_subscriber_brokered() { .unwrap(); tokio::time::sleep(SLEEP).await; - let _token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); + let token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); @@ -179,12 +185,20 @@ async fn test_liveliness_querying_subscriber_brokered() { assert_eq!(sample.kind(), SampleKind::Put); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2); - drop(token1); + token1.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert_eq!(sample.kind(), SampleKind::Delete); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1); + + token2.undeclare().await.unwrap(); + sub.close().await.unwrap(); + + router.close().await.unwrap(); + client1.close().await.unwrap(); + client2.close().await.unwrap(); + client3.close().await.unwrap(); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -244,7 +258,7 @@ async fn test_liveliness_fetching_subscriber_clique() { .unwrap(); tokio::time::sleep(SLEEP).await; - let _token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); + let token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); @@ -255,12 +269,18 @@ async fn test_liveliness_fetching_subscriber_clique() { assert_eq!(sample.kind(), SampleKind::Put); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2); - drop(token1); + token1.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert_eq!(sample.kind(), SampleKind::Delete); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1); + + token2.undeclare().await.unwrap(); + sub.close().await.unwrap(); + + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -280,7 +300,7 @@ async fn test_liveliness_fetching_subscriber_brokered() { zenoh_util::try_init_log_from_env(); - let _router = { + let router = { let mut c = config::default(); c.listen .endpoints @@ -346,7 +366,7 @@ async fn test_liveliness_fetching_subscriber_brokered() { .unwrap(); tokio::time::sleep(SLEEP).await; - let _token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); + let token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); @@ -357,10 +377,18 @@ async fn test_liveliness_fetching_subscriber_brokered() { assert_eq!(sample.kind(), SampleKind::Put); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2); - drop(token1); + token1.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert_eq!(sample.kind(), SampleKind::Delete); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1); + + token2.undeclare().await.unwrap(); + sub.close().await.unwrap(); + + router.close().await.unwrap(); + client1.close().await.unwrap(); + client2.close().await.unwrap(); + client3.close().await.unwrap(); } diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 4ca924e023..06f44b8bf5 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1238,10 +1238,7 @@ impl Session { pub(crate) fn undeclare_subscriber_inner(&self, sid: Id, kind: SubscriberKind) -> ZResult<()> { let mut state = zwrite!(self.state); - if let Some(sub_state) = state - .subscribers_mut(SubscriberKind::Subscriber) - .remove(&sid) - { + if let Some(sub_state) = state.subscribers_mut(kind).remove(&sid) { trace!("undeclare_subscriber({:?})", sub_state); for res in state .local_resources diff --git a/zenoh/tests/liveliness.rs b/zenoh/tests/liveliness.rs index 72dab9bd29..4d964cc1cf 100644 --- a/zenoh/tests/liveliness.rs +++ b/zenoh/tests/liveliness.rs @@ -65,12 +65,17 @@ async fn test_liveliness_subscriber_clique() { assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - drop(token); + token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + sub.undeclare().await.unwrap(); + + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); } #[cfg(feature = "unstable")] @@ -114,7 +119,7 @@ async fn test_liveliness_query_clique() { s }; - let _token = ztimeout!(peer1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + let token = ztimeout!(peer1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; let get = ztimeout!(peer2.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); @@ -123,6 +128,11 @@ async fn test_liveliness_query_clique() { let sample = ztimeout!(get.recv_async()).unwrap().into_result().unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + token.undeclare().await.unwrap(); + + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); } #[cfg(feature = "unstable")] @@ -141,7 +151,7 @@ async fn test_liveliness_subscriber_brokered() { zenoh_util::try_init_log_from_env(); - let _router = { + let router = { let mut c = config::default(); c.listen .endpoints @@ -190,12 +200,18 @@ async fn test_liveliness_subscriber_brokered() { assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - drop(token); + token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + sub.undeclare().await.unwrap(); + + router.close().await.unwrap(); + client1.close().await.unwrap(); + client2.close().await.unwrap(); } #[cfg(feature = "unstable")] @@ -213,7 +229,7 @@ async fn test_liveliness_query_brokered() { zenoh_util::try_init_log_from_env(); - let _router = { + let router = { let mut c = config::default(); c.listen .endpoints @@ -252,7 +268,7 @@ async fn test_liveliness_query_brokered() { s }; - let _token = ztimeout!(client1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + let token = ztimeout!(client1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; let get = ztimeout!(client2.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); @@ -261,6 +277,12 @@ async fn test_liveliness_query_brokered() { let sample = ztimeout!(get.recv_async()).unwrap().into_result().unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + token.undeclare().await.unwrap(); + + router.close().await.unwrap(); + client1.close().await.unwrap(); + client2.close().await.unwrap(); } #[cfg(feature = "unstable")] @@ -295,12 +317,15 @@ async fn test_liveliness_subscriber_local() { assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - drop(token); + token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + sub.undeclare().await.unwrap(); + peer.close().await.unwrap(); } #[cfg(feature = "unstable")] @@ -325,7 +350,7 @@ async fn test_liveliness_query_local() { s }; - let _token = ztimeout!(peer.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + let token = ztimeout!(peer.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; let get = ztimeout!(peer.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); @@ -334,4 +359,7 @@ async fn test_liveliness_query_local() { let sample = ztimeout!(get.recv_async()).unwrap().into_result().unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + token.undeclare().await.unwrap(); + peer.close().await.unwrap(); }