From 3ac57cc901d7c912ab48a2cd711e4176fc3e40d7 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Tue, 30 Jul 2024 17:35:44 +0200 Subject: [PATCH 1/3] Undeclare subscribers at the end of liveliness tests --- zenoh-ext/tests/liveliness.rs | 48 +++++++++++++++++++++++++++-------- zenoh/tests/liveliness.rs | 44 ++++++++++++++++++++++++++------ 2 files changed, 74 insertions(+), 18 deletions(-) diff --git a/zenoh-ext/tests/liveliness.rs b/zenoh-ext/tests/liveliness.rs index 97dc817394..183edf143c 100644 --- a/zenoh-ext/tests/liveliness.rs +++ b/zenoh-ext/tests/liveliness.rs @@ -70,7 +70,7 @@ async fn test_liveliness_querying_subscriber_clique() { .unwrap(); tokio::time::sleep(SLEEP).await; - let _token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); + let token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); @@ -81,12 +81,18 @@ async fn test_liveliness_querying_subscriber_clique() { assert_eq!(sample.kind(), SampleKind::Put); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2); - drop(token1); + assert!(token1.undeclare().await.is_ok()); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert_eq!(sample.kind(), SampleKind::Delete); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1); + + assert!(token2.undeclare().await.is_ok()); + assert!(sub.close().await.is_ok()); + + assert!(peer1.close().await.is_ok()); + assert!(peer2.close().await.is_ok()); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -106,7 +112,7 @@ async fn test_liveliness_querying_subscriber_brokered() { zenoh_util::try_init_log_from_env(); - let _router = { + let router = { let mut c = config::default(); c.listen .endpoints @@ -168,7 +174,7 @@ async fn test_liveliness_querying_subscriber_brokered() { .unwrap(); tokio::time::sleep(SLEEP).await; - let _token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); + let token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); @@ -179,12 +185,20 @@ async fn test_liveliness_querying_subscriber_brokered() { assert_eq!(sample.kind(), SampleKind::Put); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2); - drop(token1); + assert!(token1.undeclare().await.is_ok()); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert_eq!(sample.kind(), SampleKind::Delete); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1); + + assert!(token2.undeclare().await.is_ok()); + assert!(sub.close().await.is_ok()); + + assert!(router.close().await.is_ok()); + assert!(client1.close().await.is_ok()); + assert!(client2.close().await.is_ok()); + assert!(client3.close().await.is_ok()); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -244,7 +258,7 @@ async fn test_liveliness_fetching_subscriber_clique() { .unwrap(); tokio::time::sleep(SLEEP).await; - let _token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); + let token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); @@ -255,12 +269,18 @@ async fn test_liveliness_fetching_subscriber_clique() { assert_eq!(sample.kind(), SampleKind::Put); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2); - drop(token1); + assert!(token1.undeclare().await.is_ok()); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert_eq!(sample.kind(), SampleKind::Delete); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1); + + assert!(token2.undeclare().await.is_ok()); + assert!(sub.close().await.is_ok()); + + assert!(peer1.close().await.is_ok()); + assert!(peer2.close().await.is_ok()); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -280,7 +300,7 @@ async fn test_liveliness_fetching_subscriber_brokered() { zenoh_util::try_init_log_from_env(); - let _router = { + let router = { let mut c = config::default(); c.listen .endpoints @@ -346,7 +366,7 @@ async fn test_liveliness_fetching_subscriber_brokered() { .unwrap(); tokio::time::sleep(SLEEP).await; - let _token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); + let token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); @@ -357,10 +377,18 @@ async fn test_liveliness_fetching_subscriber_brokered() { assert_eq!(sample.kind(), SampleKind::Put); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2); - drop(token1); + assert!(token1.undeclare().await.is_ok()); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert_eq!(sample.kind(), SampleKind::Delete); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1); + + assert!(token2.undeclare().await.is_ok()); + assert!(sub.close().await.is_ok()); + + assert!(router.close().await.is_ok()); + assert!(client1.close().await.is_ok()); + assert!(client2.close().await.is_ok()); + assert!(client3.close().await.is_ok()); } diff --git a/zenoh/tests/liveliness.rs b/zenoh/tests/liveliness.rs index 72dab9bd29..efc1ab1592 100644 --- a/zenoh/tests/liveliness.rs +++ b/zenoh/tests/liveliness.rs @@ -65,12 +65,17 @@ async fn test_liveliness_subscriber_clique() { assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - drop(token); + assert!(token.undeclare().await.is_ok()); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + assert!(sub.undeclare().await.is_ok()); + + assert!(peer1.close().await.is_ok()); + assert!(peer2.close().await.is_ok()); } #[cfg(feature = "unstable")] @@ -114,7 +119,7 @@ async fn test_liveliness_query_clique() { s }; - let _token = ztimeout!(peer1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + let token = ztimeout!(peer1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; let get = ztimeout!(peer2.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); @@ -123,6 +128,11 @@ async fn test_liveliness_query_clique() { let sample = ztimeout!(get.recv_async()).unwrap().into_result().unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + assert!(token.undeclare().await.is_ok()); + + assert!(peer1.close().await.is_ok()); + assert!(peer2.close().await.is_ok()); } #[cfg(feature = "unstable")] @@ -141,7 +151,7 @@ async fn test_liveliness_subscriber_brokered() { zenoh_util::try_init_log_from_env(); - let _router = { + let router = { let mut c = config::default(); c.listen .endpoints @@ -190,12 +200,18 @@ async fn test_liveliness_subscriber_brokered() { assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - drop(token); + assert!(token.undeclare().await.is_ok()); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + assert!(sub.undeclare().await.is_ok()); + + assert!(router.close().await.is_ok()); + assert!(client1.close().await.is_ok()); + assert!(client2.close().await.is_ok()); } #[cfg(feature = "unstable")] @@ -213,7 +229,7 @@ async fn test_liveliness_query_brokered() { zenoh_util::try_init_log_from_env(); - let _router = { + let router = { let mut c = config::default(); c.listen .endpoints @@ -252,7 +268,7 @@ async fn test_liveliness_query_brokered() { s }; - let _token = ztimeout!(client1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + let token = ztimeout!(client1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; let get = ztimeout!(client2.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); @@ -261,6 +277,12 @@ async fn test_liveliness_query_brokered() { let sample = ztimeout!(get.recv_async()).unwrap().into_result().unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + assert!(token.undeclare().await.is_ok()); + + assert!(router.close().await.is_ok()); + assert!(client1.close().await.is_ok()); + assert!(client2.close().await.is_ok()); } #[cfg(feature = "unstable")] @@ -295,12 +317,15 @@ async fn test_liveliness_subscriber_local() { assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - drop(token); + assert!(token.undeclare().await.is_ok()); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + assert!(sub.undeclare().await.is_ok()); + assert!(peer.close().await.is_ok()); } #[cfg(feature = "unstable")] @@ -325,7 +350,7 @@ async fn test_liveliness_query_local() { s }; - let _token = ztimeout!(peer.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + let token = ztimeout!(peer.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); tokio::time::sleep(SLEEP).await; let get = ztimeout!(peer.liveliness().get(LIVELINESS_KEYEXPR)).unwrap(); @@ -334,4 +359,7 @@ async fn test_liveliness_query_local() { let sample = ztimeout!(get.recv_async()).unwrap().into_result().unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + assert!(token.undeclare().await.is_ok()); + assert!(peer.close().await.is_ok()); } From b4f4f6856104450e602678e4d446df209073a91f Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Tue, 30 Jul 2024 17:48:11 +0200 Subject: [PATCH 2/3] Use `kind` in `Seesion::undeclare_subscriber_inner` --- zenoh/src/api/session.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 4ca924e023..06f44b8bf5 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1238,10 +1238,7 @@ impl Session { pub(crate) fn undeclare_subscriber_inner(&self, sid: Id, kind: SubscriberKind) -> ZResult<()> { let mut state = zwrite!(self.state); - if let Some(sub_state) = state - .subscribers_mut(SubscriberKind::Subscriber) - .remove(&sid) - { + if let Some(sub_state) = state.subscribers_mut(kind).remove(&sid) { trace!("undeclare_subscriber({:?})", sub_state); for res in state .local_resources From 80e931f0217e15c2e3aabc849cc6819752b359b1 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Wed, 31 Jul 2024 10:22:43 +0200 Subject: [PATCH 3/3] Address review comments --- zenoh-ext/tests/liveliness.rs | 48 +++++++++++++++++------------------ zenoh/tests/liveliness.rs | 42 +++++++++++++++--------------- 2 files changed, 45 insertions(+), 45 deletions(-) diff --git a/zenoh-ext/tests/liveliness.rs b/zenoh-ext/tests/liveliness.rs index 183edf143c..68d4b1b798 100644 --- a/zenoh-ext/tests/liveliness.rs +++ b/zenoh-ext/tests/liveliness.rs @@ -81,18 +81,18 @@ async fn test_liveliness_querying_subscriber_clique() { assert_eq!(sample.kind(), SampleKind::Put); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2); - assert!(token1.undeclare().await.is_ok()); + token1.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert_eq!(sample.kind(), SampleKind::Delete); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1); - assert!(token2.undeclare().await.is_ok()); - assert!(sub.close().await.is_ok()); + token2.undeclare().await.unwrap(); + sub.close().await.unwrap(); - assert!(peer1.close().await.is_ok()); - assert!(peer2.close().await.is_ok()); + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -185,20 +185,20 @@ async fn test_liveliness_querying_subscriber_brokered() { assert_eq!(sample.kind(), SampleKind::Put); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2); - assert!(token1.undeclare().await.is_ok()); + token1.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert_eq!(sample.kind(), SampleKind::Delete); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1); - assert!(token2.undeclare().await.is_ok()); - assert!(sub.close().await.is_ok()); + token2.undeclare().await.unwrap(); + sub.close().await.unwrap(); - assert!(router.close().await.is_ok()); - assert!(client1.close().await.is_ok()); - assert!(client2.close().await.is_ok()); - assert!(client3.close().await.is_ok()); + router.close().await.unwrap(); + client1.close().await.unwrap(); + client2.close().await.unwrap(); + client3.close().await.unwrap(); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -269,18 +269,18 @@ async fn test_liveliness_fetching_subscriber_clique() { assert_eq!(sample.kind(), SampleKind::Put); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2); - assert!(token1.undeclare().await.is_ok()); + token1.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert_eq!(sample.kind(), SampleKind::Delete); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1); - assert!(token2.undeclare().await.is_ok()); - assert!(sub.close().await.is_ok()); + token2.undeclare().await.unwrap(); + sub.close().await.unwrap(); - assert!(peer1.close().await.is_ok()); - assert!(peer2.close().await.is_ok()); + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -377,18 +377,18 @@ async fn test_liveliness_fetching_subscriber_brokered() { assert_eq!(sample.kind(), SampleKind::Put); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2); - assert!(token1.undeclare().await.is_ok()); + token1.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert_eq!(sample.kind(), SampleKind::Delete); assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1); - assert!(token2.undeclare().await.is_ok()); - assert!(sub.close().await.is_ok()); + token2.undeclare().await.unwrap(); + sub.close().await.unwrap(); - assert!(router.close().await.is_ok()); - assert!(client1.close().await.is_ok()); - assert!(client2.close().await.is_ok()); - assert!(client3.close().await.is_ok()); + router.close().await.unwrap(); + client1.close().await.unwrap(); + client2.close().await.unwrap(); + client3.close().await.unwrap(); } diff --git a/zenoh/tests/liveliness.rs b/zenoh/tests/liveliness.rs index efc1ab1592..4d964cc1cf 100644 --- a/zenoh/tests/liveliness.rs +++ b/zenoh/tests/liveliness.rs @@ -65,17 +65,17 @@ async fn test_liveliness_subscriber_clique() { assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(token.undeclare().await.is_ok()); + token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.undeclare().await.is_ok()); + sub.undeclare().await.unwrap(); - assert!(peer1.close().await.is_ok()); - assert!(peer2.close().await.is_ok()); + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); } #[cfg(feature = "unstable")] @@ -129,10 +129,10 @@ async fn test_liveliness_query_clique() { assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(token.undeclare().await.is_ok()); + token.undeclare().await.unwrap(); - assert!(peer1.close().await.is_ok()); - assert!(peer2.close().await.is_ok()); + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); } #[cfg(feature = "unstable")] @@ -200,18 +200,18 @@ async fn test_liveliness_subscriber_brokered() { assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(token.undeclare().await.is_ok()); + token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.undeclare().await.is_ok()); + sub.undeclare().await.unwrap(); - assert!(router.close().await.is_ok()); - assert!(client1.close().await.is_ok()); - assert!(client2.close().await.is_ok()); + router.close().await.unwrap(); + client1.close().await.unwrap(); + client2.close().await.unwrap(); } #[cfg(feature = "unstable")] @@ -278,11 +278,11 @@ async fn test_liveliness_query_brokered() { assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(token.undeclare().await.is_ok()); + token.undeclare().await.unwrap(); - assert!(router.close().await.is_ok()); - assert!(client1.close().await.is_ok()); - assert!(client2.close().await.is_ok()); + router.close().await.unwrap(); + client1.close().await.unwrap(); + client2.close().await.unwrap(); } #[cfg(feature = "unstable")] @@ -317,15 +317,15 @@ async fn test_liveliness_subscriber_local() { assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(token.undeclare().await.is_ok()); + token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; let sample = ztimeout!(sub.recv_async()).unwrap(); assert!(sample.kind() == SampleKind::Delete); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub.undeclare().await.is_ok()); - assert!(peer.close().await.is_ok()); + sub.undeclare().await.unwrap(); + peer.close().await.unwrap(); } #[cfg(feature = "unstable")] @@ -360,6 +360,6 @@ async fn test_liveliness_query_local() { assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(token.undeclare().await.is_ok()); - assert!(peer.close().await.is_ok()); + token.undeclare().await.unwrap(); + peer.close().await.unwrap(); }