From 08870c0f4a89e14f7402b9df7298f33a2074e406 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Fri, 13 Sep 2024 10:22:54 +0300 Subject: [PATCH 01/12] SHM cleanup workaround --- Cargo.lock | 15 ++++++++++++++ Cargo.toml | 2 ++ commons/zenoh-shm/Cargo.toml | 3 +++ commons/zenoh-shm/src/api/cleanup.rs | 29 ++++++++++++++++++++++++++++ commons/zenoh-shm/src/api/mod.rs | 1 + commons/zenoh-shm/src/cleanup.rs | 27 +++++++++++++++++++++----- 6 files changed, 72 insertions(+), 5 deletions(-) create mode 100644 commons/zenoh-shm/src/api/cleanup.rs diff --git a/Cargo.lock b/Cargo.lock index 7c140c1c31..5241621547 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3885,6 +3885,18 @@ dependencies = [ "libc", ] +[[package]] +name = "signal-hook-tokio" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213241f76fb1e37e27de3b6aa1b068a2c333233b59cca6634f634b80a27ecf1e" +dependencies = [ + "futures-core", + "libc", + "signal-hook", + "tokio", +] + [[package]] name = "signature" version = "2.1.0" @@ -5852,12 +5864,15 @@ version = "1.0.0-dev" dependencies = [ "async-trait", "crc", + "futures", "libc", "lockfree", "num-traits", "num_cpus", "rand 0.8.5", "shared_memory", + "signal-hook", + "signal-hook-tokio", "stabby", "static_init", "thread-priority", diff --git a/Cargo.toml b/Cargo.toml index cb836bb14a..9444c4d9f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -162,6 +162,8 @@ stabby = "36.1.1" sha3 = "0.10.8" shared_memory = "0.12.4" shellexpand = "3.1.0" +signal-hook = "0.3.17" +signal-hook-tokio = "0.3.1" socket2 = { version = "0.5.7", features = ["all"] } stop-token = "0.7.0" syn = "2.0" diff --git a/commons/zenoh-shm/Cargo.toml b/commons/zenoh-shm/Cargo.toml index 0b79fd2549..130f8496ad 100644 --- a/commons/zenoh-shm/Cargo.toml +++ b/commons/zenoh-shm/Cargo.toml @@ -48,6 +48,9 @@ num_cpus = { workspace = true, optional = true } thread-priority = { workspace = true } lockfree = { workspace = true } stabby = { workspace = true } +signal-hook = { workspace = true } +signal-hook-tokio = { workspace = true, features = ["futures-v0_3"] } +futures = { workspace = true } [dev-dependencies] libc = { workspace = true } diff --git a/commons/zenoh-shm/src/api/cleanup.rs b/commons/zenoh-shm/src/api/cleanup.rs new file mode 100644 index 0000000000..aacca22582 --- /dev/null +++ b/commons/zenoh-shm/src/api/cleanup.rs @@ -0,0 +1,29 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::cleanup::CLEANUP; + +/// Make forced cleanup +/// NOTE: this is a part of ugly on-exit-cleanup workaround and will be removed +/// In order to properly cleanup some SHM internals upon process exit, Zenoh installs exit handlers (see atexit() API). +/// The bad thing is that atexit handler is executed only on process exit(), the terminating signal handlers (like SIGINT) +/// bypass it and terminate the process without cleanup. To eliminate this effect, Zenoh overrides SIGHUP, SIGTERM, SIGINT +/// and SIGQUIT handlers and calls exit() inside to make graceful shutdown. If user is going to override these Zenoh's handlers, +/// the workaround will break, and there are two ways to keep this workaround working: +/// - execute overriden Zenoh handlers in overriding handler code +/// - call forced_cleanup() anywhere at any time before terminating the process +#[zenoh_macros::unstable_doc] +pub fn forced_cleanup() { + CLEANUP.read().cleanup(); +} diff --git a/commons/zenoh-shm/src/api/mod.rs b/commons/zenoh-shm/src/api/mod.rs index a87188da29..8802b1eb58 100644 --- a/commons/zenoh-shm/src/api/mod.rs +++ b/commons/zenoh-shm/src/api/mod.rs @@ -13,6 +13,7 @@ // pub mod buffer; +pub mod cleanup; pub mod client; pub mod client_storage; pub mod common; diff --git a/commons/zenoh-shm/src/cleanup.rs b/commons/zenoh-shm/src/cleanup.rs index 5649732bf6..7887923967 100644 --- a/commons/zenoh-shm/src/cleanup.rs +++ b/commons/zenoh-shm/src/cleanup.rs @@ -12,6 +12,9 @@ // ZettaScale Zenoh Team, // +use futures::stream::StreamExt; +use signal_hook::consts::signal::*; +use signal_hook_tokio::Signals; use static_init::dynamic; /// A global cleanup, that is guaranteed to be dropped at normal program exit and that will @@ -26,11 +29,29 @@ pub(crate) struct Cleanup { impl Cleanup { fn new() -> Self { + // todo: this is a workaround to make sure Cleanup will be executed even if process terminates via signal handlers + // that execute std::terminate instead of exit + tokio::task::spawn(async { + let signals = Signals::new([SIGHUP, SIGTERM, SIGINT, SIGQUIT]).unwrap(); + let mut signals = signals.fuse(); + if let Some(_signal) = signals.next().await { + std::process::exit(0); + } + }); + Self { cleanups: Default::default(), } } + pub(crate) fn cleanup(&self) { + while let Some(cleanup) = self.cleanups.pop() { + if let Some(f) = cleanup { + f(); + } + } + } + pub(crate) fn register_cleanup(&self, cleanup_fn: Box) { self.cleanups.push(Some(cleanup_fn)); } @@ -38,10 +59,6 @@ impl Cleanup { impl Drop for Cleanup { fn drop(&mut self) { - while let Some(cleanup) = self.cleanups.pop() { - if let Some(f) = cleanup { - f(); - } - } + self.cleanup(); } } From 1d341479fc43fce6e52e58ac89b2cb03199555cd Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Fri, 13 Sep 2024 10:52:29 +0300 Subject: [PATCH 02/12] fix CI --- Cargo.toml | 4 ++-- commons/zenoh-shm/src/api/cleanup.rs | 2 +- commons/zenoh-shm/tests/header.rs | 1 + 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9444c4d9f1..bf98508af7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -162,8 +162,8 @@ stabby = "36.1.1" sha3 = "0.10.8" shared_memory = "0.12.4" shellexpand = "3.1.0" -signal-hook = "0.3.17" -signal-hook-tokio = "0.3.1" +signal-hook = { version = "0.3.17", default-features = false } +signal-hook-tokio = { version = "0.3.1", default-features = false } socket2 = { version = "0.5.7", features = ["all"] } stop-token = "0.7.0" syn = "2.0" diff --git a/commons/zenoh-shm/src/api/cleanup.rs b/commons/zenoh-shm/src/api/cleanup.rs index aacca22582..df4e49dd6e 100644 --- a/commons/zenoh-shm/src/api/cleanup.rs +++ b/commons/zenoh-shm/src/api/cleanup.rs @@ -21,7 +21,7 @@ use crate::cleanup::CLEANUP; /// bypass it and terminate the process without cleanup. To eliminate this effect, Zenoh overrides SIGHUP, SIGTERM, SIGINT /// and SIGQUIT handlers and calls exit() inside to make graceful shutdown. If user is going to override these Zenoh's handlers, /// the workaround will break, and there are two ways to keep this workaround working: -/// - execute overriden Zenoh handlers in overriding handler code +/// - execute overridden Zenoh handlers in overriding handler code /// - call forced_cleanup() anywhere at any time before terminating the process #[zenoh_macros::unstable_doc] pub fn forced_cleanup() { diff --git a/commons/zenoh-shm/tests/header.rs b/commons/zenoh-shm/tests/header.rs index 747757a3b2..64428897d6 100644 --- a/commons/zenoh-shm/tests/header.rs +++ b/commons/zenoh-shm/tests/header.rs @@ -32,6 +32,7 @@ fn header_alloc_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sy } #[test] +#[tokio::test(flavor = "single_thread")] fn header_alloc() { execute_concurrent(1, 1000, header_alloc_fn()); } From ef14b916ff4d22ea04493d86ea5b4ef339702511 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Fri, 13 Sep 2024 10:57:39 +0300 Subject: [PATCH 03/12] fix SHM tests --- commons/zenoh-shm/tests/header.rs | 23 ++++++---- commons/zenoh-shm/tests/posix_array.rs | 18 +++++--- commons/zenoh-shm/tests/posix_segment.rs | 36 ++++++++++------ commons/zenoh-shm/tests/posix_shm_provider.rs | 12 ++++-- commons/zenoh-shm/tests/watchdog.rs | 42 ++++++++++++------- 5 files changed, 87 insertions(+), 44 deletions(-) diff --git a/commons/zenoh-shm/tests/header.rs b/commons/zenoh-shm/tests/header.rs index 64428897d6..edfa700033 100644 --- a/commons/zenoh-shm/tests/header.rs +++ b/commons/zenoh-shm/tests/header.rs @@ -33,12 +33,13 @@ fn header_alloc_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sy #[test] #[tokio::test(flavor = "single_thread")] -fn header_alloc() { +async fn header_alloc() { execute_concurrent(1, 1000, header_alloc_fn()); } #[test] -fn header_alloc_concurrent() { +#[tokio::test(flavor = "single_thread")] +async fn header_alloc_concurrent() { execute_concurrent(100, 1000, header_alloc_fn()); } @@ -52,12 +53,14 @@ fn header_link_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Syn } #[test] -fn header_link() { +#[tokio::test(flavor = "single_thread")] +async fn header_link() { execute_concurrent(1, 1000, header_link_fn()); } #[test] -fn header_link_concurrent() { +#[tokio::test(flavor = "single_thread")] +async fn header_link_concurrent() { execute_concurrent(100, 1000, header_link_fn()); } @@ -80,12 +83,14 @@ fn header_link_failure_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Se } #[test] -fn header_link_failure() { +#[tokio::test(flavor = "single_thread")] +async fn header_link_failure() { execute_concurrent(1, 1000, header_link_failure_fn()); } #[test] -fn header_link_failure_concurrent() { +#[tokio::test(flavor = "single_thread")] +async fn header_link_failure_concurrent() { execute_concurrent(100, 1000, header_link_failure_fn()); } @@ -121,11 +126,13 @@ fn header_check_memory_fn(parallel_tasks: usize, iterations: usize) { } #[test] -fn header_check_memory() { +#[tokio::test(flavor = "single_thread")] +async fn header_check_memory() { header_check_memory_fn(1, 1000); } #[test] -fn header_check_memory_concurrent() { +#[tokio::test(flavor = "single_thread")] +async fn header_check_memory_concurrent() { header_check_memory_fn(100, 100); } diff --git a/commons/zenoh-shm/tests/posix_array.rs b/commons/zenoh-shm/tests/posix_array.rs index 83fdad88fb..44c6bd45de 100644 --- a/commons/zenoh-shm/tests/posix_array.rs +++ b/commons/zenoh-shm/tests/posix_array.rs @@ -111,17 +111,20 @@ where /// MEMORY CHECKS /// #[test] -fn arr_u8_index_memory_test() { +#[tokio::test(flavor = "single_thread")] +async fn arr_u8_index_memory_test() { test_array::(); } #[test] -fn arr_u16_index_memory_test() { +#[tokio::test(flavor = "single_thread")] +async fn arr_u16_index_memory_test() { test_array::(); } #[test] -fn arr_u32_index_memory_test() { +#[tokio::test(flavor = "single_thread")] +async fn arr_u32_index_memory_test() { test_array::(); } @@ -146,16 +149,19 @@ where } #[test] -fn arr_u8_index_invalid_elem_count() { +#[tokio::test(flavor = "single_thread")] +async fn arr_u8_index_invalid_elem_count() { test_invalid_elem_index::(); } #[test] -fn arr_u16_index_invalid_elem_count() { +#[tokio::test(flavor = "single_thread")] +async fn arr_u16_index_invalid_elem_count() { test_invalid_elem_index::(); } #[test] -fn arr_u32_index_invalid_elem_count() { +#[tokio::test(flavor = "single_thread")] +async fn arr_u32_index_invalid_elem_count() { test_invalid_elem_index::(); } diff --git a/commons/zenoh-shm/tests/posix_segment.rs b/commons/zenoh-shm/tests/posix_segment.rs index 879fccf298..90e41d36e4 100644 --- a/commons/zenoh-shm/tests/posix_segment.rs +++ b/commons/zenoh-shm/tests/posix_segment.rs @@ -67,61 +67,72 @@ where /// UNSIGNED /// #[test] -fn segment_u8_id() { +#[tokio::test(flavor = "single_thread")] +async fn segment_u8_id() { test_segment::() } #[test] -fn segment_u16_id() { +#[tokio::test(flavor = "single_thread")] +async fn segment_u16_id() { test_segment::() } #[test] -fn segment_u32_id() { +#[tokio::test(flavor = "single_thread")] +async fn segment_u32_id() { test_segment::() } #[test] -fn segment_u64_id() { +#[tokio::test(flavor = "single_thread")] +async fn segment_u64_id() { test_segment::() } #[test] -fn segment_u128_id() { +#[tokio::test(flavor = "single_thread")] +async fn segment_u128_id() { test_segment::() } /// SIGNED /// #[test] -fn segment_i8_id() { +#[tokio::test(flavor = "single_thread")] +async fn segment_i8_id() { test_segment::() } #[test] -fn segment_i16_id() { +#[tokio::test(flavor = "single_thread")] +async fn segment_i16_id() { test_segment::() } #[test] -fn segment_i32_id() { +#[tokio::test(flavor = "single_thread")] +async fn segment_i32_id() { test_segment::() } #[test] -fn segment_i64_id() { +#[tokio::test(flavor = "single_thread")] +async fn segment_i64_id() { test_segment::() } #[test] -fn segment_i128_id() { +#[tokio::test(flavor = "single_thread")] +async fn segment_i128_id() { test_segment::() } /// Behaviour checks /// #[test] -fn segment_open() { +#[tokio::test(flavor = "single_thread")] +async fn segment_open() { let created_segment: Segment = Segment::create(900, TEST_SEGMENT_PREFIX).expect("error creating new segment"); @@ -130,7 +141,8 @@ fn segment_open() { } #[test] -fn segment_open_error() { +#[tokio::test(flavor = "single_thread")] +async fn segment_open_error() { let id = { let created_segment: Segment = Segment::create(900, TEST_SEGMENT_PREFIX).expect("error creating new segment"); diff --git a/commons/zenoh-shm/tests/posix_shm_provider.rs b/commons/zenoh-shm/tests/posix_shm_provider.rs index f0f7417340..df43b7582f 100644 --- a/commons/zenoh-shm/tests/posix_shm_provider.rs +++ b/commons/zenoh-shm/tests/posix_shm_provider.rs @@ -28,7 +28,8 @@ static BUFFER_NUM: usize = 100; static BUFFER_SIZE: usize = 1024; #[test] -fn posix_shm_provider_create() { +#[tokio::test(flavor = "single_thread")] +async fn posix_shm_provider_create() { let _backend = PosixShmProviderBackend::builder() .with_size(1024) .expect("Error creating Layout!") @@ -37,7 +38,8 @@ fn posix_shm_provider_create() { } #[test] -fn posix_shm_provider_alloc() { +#[tokio::test(flavor = "single_thread")] +async fn posix_shm_provider_alloc() { let backend = PosixShmProviderBackend::builder() .with_size(1024) .expect("Error creating Layout!") @@ -52,7 +54,8 @@ fn posix_shm_provider_alloc() { } #[test] -fn posix_shm_provider_open() { +#[tokio::test(flavor = "single_thread")] +async fn posix_shm_provider_open() { let backend = PosixShmProviderBackend::builder() .with_size(1024) .expect("Error creating Layout!") @@ -73,7 +76,8 @@ fn posix_shm_provider_open() { } #[test] -fn posix_shm_provider_allocator() { +#[tokio::test(flavor = "single_thread")] +async fn posix_shm_provider_allocator() { let backend = PosixShmProviderBackend::builder() .with_size(BUFFER_SIZE * BUFFER_NUM) .expect("Error creating Layout!") diff --git a/commons/zenoh-shm/tests/watchdog.rs b/commons/zenoh-shm/tests/watchdog.rs index bc4a75dfa9..7506cfc4f2 100644 --- a/commons/zenoh-shm/tests/watchdog.rs +++ b/commons/zenoh-shm/tests/watchdog.rs @@ -36,12 +36,14 @@ fn watchdog_alloc_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + } #[test] -fn watchdog_alloc() { +#[tokio::test(flavor = "single_thread")] +async fn watchdog_alloc() { execute_concurrent(1, 10000, watchdog_alloc_fn()); } #[test] -fn watchdog_alloc_concurrent() { +#[tokio::test(flavor = "single_thread")] +async fn watchdog_alloc_concurrent() { execute_concurrent(1000, 10000, watchdog_alloc_fn()); } @@ -64,13 +66,15 @@ fn watchdog_confirmed_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Sen #[test] #[ignore] -fn watchdog_confirmed() { +#[tokio::test(flavor = "single_thread")] +async fn watchdog_confirmed() { execute_concurrent(1, 10, watchdog_confirmed_fn()); } #[test] #[ignore] -fn watchdog_confirmed_concurrent() { +#[tokio::test(flavor = "single_thread")] +async fn watchdog_confirmed_concurrent() { execute_concurrent(1000, 10, watchdog_confirmed_fn()); } @@ -79,7 +83,8 @@ fn watchdog_confirmed_concurrent() { // so we cannot run dangling test in parallel with anything else #[test] #[ignore] -fn watchdog_confirmed_dangling() { +#[tokio::test(flavor = "single_thread")] +async fn watchdog_confirmed_dangling() { let allocated = GLOBAL_STORAGE .read() .allocate_watchdog() @@ -138,13 +143,15 @@ fn watchdog_validated_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Sen #[test] #[ignore] -fn watchdog_validated() { +#[tokio::test(flavor = "single_thread")] +async fn watchdog_validated() { execute_concurrent(1, 10, watchdog_validated_fn()); } #[test] #[ignore] -fn watchdog_validated_concurrent() { +#[tokio::test(flavor = "single_thread")] +async fn watchdog_validated_concurrent() { execute_concurrent(1000, 10, watchdog_validated_fn()); } @@ -178,13 +185,15 @@ fn watchdog_validated_invalid_without_confirmator_fn( #[test] #[ignore] -fn watchdog_validated_invalid_without_confirmator() { +#[tokio::test(flavor = "single_thread")] +async fn watchdog_validated_invalid_without_confirmator() { execute_concurrent(1, 10, watchdog_validated_invalid_without_confirmator_fn()); } #[test] #[ignore] -fn watchdog_validated_invalid_without_confirmator_concurrent() { +#[tokio::test(flavor = "single_thread")] +async fn watchdog_validated_invalid_without_confirmator_concurrent() { execute_concurrent( 1000, 10, @@ -243,13 +252,15 @@ fn watchdog_validated_additional_confirmation_fn( #[test] #[ignore] -fn watchdog_validated_additional_confirmation() { +#[tokio::test(flavor = "single_thread")] +async fn watchdog_validated_additional_confirmation() { execute_concurrent(1, 10, watchdog_validated_additional_confirmation_fn()); } #[test] #[ignore] -fn watchdog_validated_additional_confirmation_concurrent() { +#[tokio::test(flavor = "single_thread")] +async fn watchdog_validated_additional_confirmation_concurrent() { execute_concurrent(1000, 10, watchdog_validated_additional_confirmation_fn()); } @@ -298,21 +309,24 @@ fn watchdog_validated_overloaded_system_fn( #[test] #[ignore] -fn watchdog_validated_low_load() { +#[tokio::test(flavor = "single_thread")] +async fn watchdog_validated_low_load() { let _load = CpuLoad::low(); execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn()); } #[test] #[ignore] -fn watchdog_validated_high_load() { +#[tokio::test(flavor = "single_thread")] +async fn watchdog_validated_high_load() { let _load = CpuLoad::optimal_high(); execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn()); } #[test] #[ignore] -fn watchdog_validated_overloaded_system() { +#[tokio::test(flavor = "single_thread")] +async fn watchdog_validated_overloaded_system() { let _load = CpuLoad::excessive(); execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn()); } From e3d351153e73c47535d5572228fa5728987e60d9 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Fri, 13 Sep 2024 11:00:43 +0300 Subject: [PATCH 04/12] fix CI --- commons/zenoh-shm/tests/header.rs | 16 +++++------ commons/zenoh-shm/tests/posix_array.rs | 12 ++++---- commons/zenoh-shm/tests/posix_segment.rs | 24 ++++++++-------- commons/zenoh-shm/tests/posix_shm_provider.rs | 8 +++--- commons/zenoh-shm/tests/watchdog.rs | 28 +++++++++---------- 5 files changed, 44 insertions(+), 44 deletions(-) diff --git a/commons/zenoh-shm/tests/header.rs b/commons/zenoh-shm/tests/header.rs index edfa700033..9685a7614f 100644 --- a/commons/zenoh-shm/tests/header.rs +++ b/commons/zenoh-shm/tests/header.rs @@ -32,13 +32,13 @@ fn header_alloc_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sy } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn header_alloc() { execute_concurrent(1, 1000, header_alloc_fn()); } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn header_alloc_concurrent() { execute_concurrent(100, 1000, header_alloc_fn()); } @@ -53,13 +53,13 @@ fn header_link_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Syn } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn header_link() { execute_concurrent(1, 1000, header_link_fn()); } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn header_link_concurrent() { execute_concurrent(100, 1000, header_link_fn()); } @@ -83,13 +83,13 @@ fn header_link_failure_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Se } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn header_link_failure() { execute_concurrent(1, 1000, header_link_failure_fn()); } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn header_link_failure_concurrent() { execute_concurrent(100, 1000, header_link_failure_fn()); } @@ -126,13 +126,13 @@ fn header_check_memory_fn(parallel_tasks: usize, iterations: usize) { } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn header_check_memory() { header_check_memory_fn(1, 1000); } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn header_check_memory_concurrent() { header_check_memory_fn(100, 100); } diff --git a/commons/zenoh-shm/tests/posix_array.rs b/commons/zenoh-shm/tests/posix_array.rs index 44c6bd45de..20a90b38f1 100644 --- a/commons/zenoh-shm/tests/posix_array.rs +++ b/commons/zenoh-shm/tests/posix_array.rs @@ -111,19 +111,19 @@ where /// MEMORY CHECKS /// #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn arr_u8_index_memory_test() { test_array::(); } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn arr_u16_index_memory_test() { test_array::(); } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn arr_u32_index_memory_test() { test_array::(); } @@ -149,19 +149,19 @@ where } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn arr_u8_index_invalid_elem_count() { test_invalid_elem_index::(); } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn arr_u16_index_invalid_elem_count() { test_invalid_elem_index::(); } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn arr_u32_index_invalid_elem_count() { test_invalid_elem_index::(); } diff --git a/commons/zenoh-shm/tests/posix_segment.rs b/commons/zenoh-shm/tests/posix_segment.rs index 90e41d36e4..45f0cce762 100644 --- a/commons/zenoh-shm/tests/posix_segment.rs +++ b/commons/zenoh-shm/tests/posix_segment.rs @@ -67,31 +67,31 @@ where /// UNSIGNED /// #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn segment_u8_id() { test_segment::() } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn segment_u16_id() { test_segment::() } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn segment_u32_id() { test_segment::() } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn segment_u64_id() { test_segment::() } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn segment_u128_id() { test_segment::() } @@ -99,31 +99,31 @@ async fn segment_u128_id() { /// SIGNED /// #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn segment_i8_id() { test_segment::() } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn segment_i16_id() { test_segment::() } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn segment_i32_id() { test_segment::() } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn segment_i64_id() { test_segment::() } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn segment_i128_id() { test_segment::() } @@ -131,7 +131,7 @@ async fn segment_i128_id() { /// Behaviour checks /// #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn segment_open() { let created_segment: Segment = Segment::create(900, TEST_SEGMENT_PREFIX).expect("error creating new segment"); @@ -141,7 +141,7 @@ async fn segment_open() { } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn segment_open_error() { let id = { let created_segment: Segment = diff --git a/commons/zenoh-shm/tests/posix_shm_provider.rs b/commons/zenoh-shm/tests/posix_shm_provider.rs index df43b7582f..7fe00a5ba8 100644 --- a/commons/zenoh-shm/tests/posix_shm_provider.rs +++ b/commons/zenoh-shm/tests/posix_shm_provider.rs @@ -28,7 +28,7 @@ static BUFFER_NUM: usize = 100; static BUFFER_SIZE: usize = 1024; #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn posix_shm_provider_create() { let _backend = PosixShmProviderBackend::builder() .with_size(1024) @@ -38,7 +38,7 @@ async fn posix_shm_provider_create() { } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn posix_shm_provider_alloc() { let backend = PosixShmProviderBackend::builder() .with_size(1024) @@ -54,7 +54,7 @@ async fn posix_shm_provider_alloc() { } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn posix_shm_provider_open() { let backend = PosixShmProviderBackend::builder() .with_size(1024) @@ -76,7 +76,7 @@ async fn posix_shm_provider_open() { } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn posix_shm_provider_allocator() { let backend = PosixShmProviderBackend::builder() .with_size(BUFFER_SIZE * BUFFER_NUM) diff --git a/commons/zenoh-shm/tests/watchdog.rs b/commons/zenoh-shm/tests/watchdog.rs index 7506cfc4f2..9f0d4e72c5 100644 --- a/commons/zenoh-shm/tests/watchdog.rs +++ b/commons/zenoh-shm/tests/watchdog.rs @@ -36,13 +36,13 @@ fn watchdog_alloc_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn watchdog_alloc() { execute_concurrent(1, 10000, watchdog_alloc_fn()); } #[test] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn watchdog_alloc_concurrent() { execute_concurrent(1000, 10000, watchdog_alloc_fn()); } @@ -66,14 +66,14 @@ fn watchdog_confirmed_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Sen #[test] #[ignore] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn watchdog_confirmed() { execute_concurrent(1, 10, watchdog_confirmed_fn()); } #[test] #[ignore] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn watchdog_confirmed_concurrent() { execute_concurrent(1000, 10, watchdog_confirmed_fn()); } @@ -83,7 +83,7 @@ async fn watchdog_confirmed_concurrent() { // so we cannot run dangling test in parallel with anything else #[test] #[ignore] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn watchdog_confirmed_dangling() { let allocated = GLOBAL_STORAGE .read() @@ -143,14 +143,14 @@ fn watchdog_validated_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Sen #[test] #[ignore] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn watchdog_validated() { execute_concurrent(1, 10, watchdog_validated_fn()); } #[test] #[ignore] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn watchdog_validated_concurrent() { execute_concurrent(1000, 10, watchdog_validated_fn()); } @@ -185,14 +185,14 @@ fn watchdog_validated_invalid_without_confirmator_fn( #[test] #[ignore] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn watchdog_validated_invalid_without_confirmator() { execute_concurrent(1, 10, watchdog_validated_invalid_without_confirmator_fn()); } #[test] #[ignore] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn watchdog_validated_invalid_without_confirmator_concurrent() { execute_concurrent( 1000, @@ -252,14 +252,14 @@ fn watchdog_validated_additional_confirmation_fn( #[test] #[ignore] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn watchdog_validated_additional_confirmation() { execute_concurrent(1, 10, watchdog_validated_additional_confirmation_fn()); } #[test] #[ignore] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn watchdog_validated_additional_confirmation_concurrent() { execute_concurrent(1000, 10, watchdog_validated_additional_confirmation_fn()); } @@ -309,7 +309,7 @@ fn watchdog_validated_overloaded_system_fn( #[test] #[ignore] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn watchdog_validated_low_load() { let _load = CpuLoad::low(); execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn()); @@ -317,7 +317,7 @@ async fn watchdog_validated_low_load() { #[test] #[ignore] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn watchdog_validated_high_load() { let _load = CpuLoad::optimal_high(); execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn()); @@ -325,7 +325,7 @@ async fn watchdog_validated_high_load() { #[test] #[ignore] -#[tokio::test(flavor = "single_thread")] +#[tokio::test(flavor = "current_thread")] async fn watchdog_validated_overloaded_system() { let _load = CpuLoad::excessive(); execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn()); From 896bd26f800ee692d5bed0185cebb18facbc2ea2 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Fri, 13 Sep 2024 18:03:19 +0300 Subject: [PATCH 05/12] fix CI --- commons/zenoh-shm/tests/header.rs | 8 -------- commons/zenoh-shm/tests/posix_array.rs | 6 ------ commons/zenoh-shm/tests/posix_segment.rs | 12 ------------ commons/zenoh-shm/tests/posix_shm_provider.rs | 4 ---- commons/zenoh-shm/tests/watchdog.rs | 14 -------------- 5 files changed, 44 deletions(-) diff --git a/commons/zenoh-shm/tests/header.rs b/commons/zenoh-shm/tests/header.rs index 9685a7614f..07c6aac13e 100644 --- a/commons/zenoh-shm/tests/header.rs +++ b/commons/zenoh-shm/tests/header.rs @@ -31,13 +31,11 @@ fn header_alloc_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sy } } -#[test] #[tokio::test(flavor = "current_thread")] async fn header_alloc() { execute_concurrent(1, 1000, header_alloc_fn()); } -#[test] #[tokio::test(flavor = "current_thread")] async fn header_alloc_concurrent() { execute_concurrent(100, 1000, header_alloc_fn()); @@ -52,13 +50,11 @@ fn header_link_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Syn } } -#[test] #[tokio::test(flavor = "current_thread")] async fn header_link() { execute_concurrent(1, 1000, header_link_fn()); } -#[test] #[tokio::test(flavor = "current_thread")] async fn header_link_concurrent() { execute_concurrent(100, 1000, header_link_fn()); @@ -82,13 +78,11 @@ fn header_link_failure_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Se } } -#[test] #[tokio::test(flavor = "current_thread")] async fn header_link_failure() { execute_concurrent(1, 1000, header_link_failure_fn()); } -#[test] #[tokio::test(flavor = "current_thread")] async fn header_link_failure_concurrent() { execute_concurrent(100, 1000, header_link_failure_fn()); @@ -125,13 +119,11 @@ fn header_check_memory_fn(parallel_tasks: usize, iterations: usize) { execute_concurrent(parallel_tasks, iterations, task_fun); } -#[test] #[tokio::test(flavor = "current_thread")] async fn header_check_memory() { header_check_memory_fn(1, 1000); } -#[test] #[tokio::test(flavor = "current_thread")] async fn header_check_memory_concurrent() { header_check_memory_fn(100, 100); diff --git a/commons/zenoh-shm/tests/posix_array.rs b/commons/zenoh-shm/tests/posix_array.rs index 20a90b38f1..0a9189ef00 100644 --- a/commons/zenoh-shm/tests/posix_array.rs +++ b/commons/zenoh-shm/tests/posix_array.rs @@ -110,19 +110,16 @@ where /// MEMORY CHECKS /// -#[test] #[tokio::test(flavor = "current_thread")] async fn arr_u8_index_memory_test() { test_array::(); } -#[test] #[tokio::test(flavor = "current_thread")] async fn arr_u16_index_memory_test() { test_array::(); } -#[test] #[tokio::test(flavor = "current_thread")] async fn arr_u32_index_memory_test() { test_array::(); @@ -148,19 +145,16 @@ where ); } -#[test] #[tokio::test(flavor = "current_thread")] async fn arr_u8_index_invalid_elem_count() { test_invalid_elem_index::(); } -#[test] #[tokio::test(flavor = "current_thread")] async fn arr_u16_index_invalid_elem_count() { test_invalid_elem_index::(); } -#[test] #[tokio::test(flavor = "current_thread")] async fn arr_u32_index_invalid_elem_count() { test_invalid_elem_index::(); diff --git a/commons/zenoh-shm/tests/posix_segment.rs b/commons/zenoh-shm/tests/posix_segment.rs index 45f0cce762..847f06a442 100644 --- a/commons/zenoh-shm/tests/posix_segment.rs +++ b/commons/zenoh-shm/tests/posix_segment.rs @@ -66,31 +66,26 @@ where /// UNSIGNED /// -#[test] #[tokio::test(flavor = "current_thread")] async fn segment_u8_id() { test_segment::() } -#[test] #[tokio::test(flavor = "current_thread")] async fn segment_u16_id() { test_segment::() } -#[test] #[tokio::test(flavor = "current_thread")] async fn segment_u32_id() { test_segment::() } -#[test] #[tokio::test(flavor = "current_thread")] async fn segment_u64_id() { test_segment::() } -#[test] #[tokio::test(flavor = "current_thread")] async fn segment_u128_id() { test_segment::() @@ -98,31 +93,26 @@ async fn segment_u128_id() { /// SIGNED /// -#[test] #[tokio::test(flavor = "current_thread")] async fn segment_i8_id() { test_segment::() } -#[test] #[tokio::test(flavor = "current_thread")] async fn segment_i16_id() { test_segment::() } -#[test] #[tokio::test(flavor = "current_thread")] async fn segment_i32_id() { test_segment::() } -#[test] #[tokio::test(flavor = "current_thread")] async fn segment_i64_id() { test_segment::() } -#[test] #[tokio::test(flavor = "current_thread")] async fn segment_i128_id() { test_segment::() @@ -130,7 +120,6 @@ async fn segment_i128_id() { /// Behaviour checks /// -#[test] #[tokio::test(flavor = "current_thread")] async fn segment_open() { let created_segment: Segment = @@ -140,7 +129,6 @@ async fn segment_open() { .expect("error opening existing segment!"); } -#[test] #[tokio::test(flavor = "current_thread")] async fn segment_open_error() { let id = { diff --git a/commons/zenoh-shm/tests/posix_shm_provider.rs b/commons/zenoh-shm/tests/posix_shm_provider.rs index 7fe00a5ba8..5581250cc5 100644 --- a/commons/zenoh-shm/tests/posix_shm_provider.rs +++ b/commons/zenoh-shm/tests/posix_shm_provider.rs @@ -27,7 +27,6 @@ use zenoh_shm::api::{ static BUFFER_NUM: usize = 100; static BUFFER_SIZE: usize = 1024; -#[test] #[tokio::test(flavor = "current_thread")] async fn posix_shm_provider_create() { let _backend = PosixShmProviderBackend::builder() @@ -37,7 +36,6 @@ async fn posix_shm_provider_create() { .expect("Error creating PosixShmProviderBackend!"); } -#[test] #[tokio::test(flavor = "current_thread")] async fn posix_shm_provider_alloc() { let backend = PosixShmProviderBackend::builder() @@ -53,7 +51,6 @@ async fn posix_shm_provider_alloc() { .expect("PosixShmProviderBackend: error allocating buffer"); } -#[test] #[tokio::test(flavor = "current_thread")] async fn posix_shm_provider_open() { let backend = PosixShmProviderBackend::builder() @@ -75,7 +72,6 @@ async fn posix_shm_provider_open() { .expect("Error attaching to segment"); } -#[test] #[tokio::test(flavor = "current_thread")] async fn posix_shm_provider_allocator() { let backend = PosixShmProviderBackend::builder() diff --git a/commons/zenoh-shm/tests/watchdog.rs b/commons/zenoh-shm/tests/watchdog.rs index 9f0d4e72c5..5bb2476821 100644 --- a/commons/zenoh-shm/tests/watchdog.rs +++ b/commons/zenoh-shm/tests/watchdog.rs @@ -35,13 +35,11 @@ fn watchdog_alloc_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + } } -#[test] #[tokio::test(flavor = "current_thread")] async fn watchdog_alloc() { execute_concurrent(1, 10000, watchdog_alloc_fn()); } -#[test] #[tokio::test(flavor = "current_thread")] async fn watchdog_alloc_concurrent() { execute_concurrent(1000, 10000, watchdog_alloc_fn()); @@ -64,14 +62,12 @@ fn watchdog_confirmed_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Sen } } -#[test] #[ignore] #[tokio::test(flavor = "current_thread")] async fn watchdog_confirmed() { execute_concurrent(1, 10, watchdog_confirmed_fn()); } -#[test] #[ignore] #[tokio::test(flavor = "current_thread")] async fn watchdog_confirmed_concurrent() { @@ -81,7 +77,6 @@ async fn watchdog_confirmed_concurrent() { // TODO: confirmation to dangling watchdog actually writes to potentially-existing // other watchdog instance from other test running in the same process and changes it's behaviour, // so we cannot run dangling test in parallel with anything else -#[test] #[ignore] #[tokio::test(flavor = "current_thread")] async fn watchdog_confirmed_dangling() { @@ -141,14 +136,12 @@ fn watchdog_validated_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Sen } } -#[test] #[ignore] #[tokio::test(flavor = "current_thread")] async fn watchdog_validated() { execute_concurrent(1, 10, watchdog_validated_fn()); } -#[test] #[ignore] #[tokio::test(flavor = "current_thread")] async fn watchdog_validated_concurrent() { @@ -183,14 +176,12 @@ fn watchdog_validated_invalid_without_confirmator_fn( } } -#[test] #[ignore] #[tokio::test(flavor = "current_thread")] async fn watchdog_validated_invalid_without_confirmator() { execute_concurrent(1, 10, watchdog_validated_invalid_without_confirmator_fn()); } -#[test] #[ignore] #[tokio::test(flavor = "current_thread")] async fn watchdog_validated_invalid_without_confirmator_concurrent() { @@ -250,14 +241,12 @@ fn watchdog_validated_additional_confirmation_fn( } } -#[test] #[ignore] #[tokio::test(flavor = "current_thread")] async fn watchdog_validated_additional_confirmation() { execute_concurrent(1, 10, watchdog_validated_additional_confirmation_fn()); } -#[test] #[ignore] #[tokio::test(flavor = "current_thread")] async fn watchdog_validated_additional_confirmation_concurrent() { @@ -307,7 +296,6 @@ fn watchdog_validated_overloaded_system_fn( } } -#[test] #[ignore] #[tokio::test(flavor = "current_thread")] async fn watchdog_validated_low_load() { @@ -315,7 +303,6 @@ async fn watchdog_validated_low_load() { execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn()); } -#[test] #[ignore] #[tokio::test(flavor = "current_thread")] async fn watchdog_validated_high_load() { @@ -323,7 +310,6 @@ async fn watchdog_validated_high_load() { execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn()); } -#[test] #[ignore] #[tokio::test(flavor = "current_thread")] async fn watchdog_validated_overloaded_system() { From 5181b911a95ea9a6acc140db8cfac2428a9cccc9 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Sun, 15 Sep 2024 12:54:11 +0300 Subject: [PATCH 06/12] use proper tokio runtime for spawning cleanup tasks --- Cargo.lock | 1 + commons/zenoh-shm/Cargo.toml | 1 + commons/zenoh-shm/src/cleanup.rs | 2 +- commons/zenoh-shm/tests/header.rs | 32 +++++------ commons/zenoh-shm/tests/posix_array.rs | 24 ++++---- commons/zenoh-shm/tests/posix_segment.rs | 48 ++++++++-------- commons/zenoh-shm/tests/posix_shm_provider.rs | 16 +++--- commons/zenoh-shm/tests/watchdog.rs | 56 +++++++++---------- 8 files changed, 91 insertions(+), 89 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5241621547..836933c2d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5882,6 +5882,7 @@ dependencies = [ "zenoh-core", "zenoh-macros", "zenoh-result", + "zenoh-runtime", ] [[package]] diff --git a/commons/zenoh-shm/Cargo.toml b/commons/zenoh-shm/Cargo.toml index 130f8496ad..e2b390a696 100644 --- a/commons/zenoh-shm/Cargo.toml +++ b/commons/zenoh-shm/Cargo.toml @@ -41,6 +41,7 @@ zenoh-result = { workspace = true } zenoh-core = { workspace = true } zenoh-macros = { workspace = true } zenoh-buffers = { workspace = true } +zenoh-runtime = { workspace = true } rand = { workspace = true } static_init = { workspace = true } num-traits = { workspace = true } diff --git a/commons/zenoh-shm/src/cleanup.rs b/commons/zenoh-shm/src/cleanup.rs index 7887923967..139bbdc9f1 100644 --- a/commons/zenoh-shm/src/cleanup.rs +++ b/commons/zenoh-shm/src/cleanup.rs @@ -31,7 +31,7 @@ impl Cleanup { fn new() -> Self { // todo: this is a workaround to make sure Cleanup will be executed even if process terminates via signal handlers // that execute std::terminate instead of exit - tokio::task::spawn(async { + zenoh_runtime::ZRuntime::Acceptor.spawn(async { let signals = Signals::new([SIGHUP, SIGTERM, SIGINT, SIGQUIT]).unwrap(); let mut signals = signals.fuse(); if let Some(_signal) = signals.next().await { diff --git a/commons/zenoh-shm/tests/header.rs b/commons/zenoh-shm/tests/header.rs index 07c6aac13e..747757a3b2 100644 --- a/commons/zenoh-shm/tests/header.rs +++ b/commons/zenoh-shm/tests/header.rs @@ -31,13 +31,13 @@ fn header_alloc_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Sy } } -#[tokio::test(flavor = "current_thread")] -async fn header_alloc() { +#[test] +fn header_alloc() { execute_concurrent(1, 1000, header_alloc_fn()); } -#[tokio::test(flavor = "current_thread")] -async fn header_alloc_concurrent() { +#[test] +fn header_alloc_concurrent() { execute_concurrent(100, 1000, header_alloc_fn()); } @@ -50,13 +50,13 @@ fn header_link_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + Syn } } -#[tokio::test(flavor = "current_thread")] -async fn header_link() { +#[test] +fn header_link() { execute_concurrent(1, 1000, header_link_fn()); } -#[tokio::test(flavor = "current_thread")] -async fn header_link_concurrent() { +#[test] +fn header_link_concurrent() { execute_concurrent(100, 1000, header_link_fn()); } @@ -78,13 +78,13 @@ fn header_link_failure_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Se } } -#[tokio::test(flavor = "current_thread")] -async fn header_link_failure() { +#[test] +fn header_link_failure() { execute_concurrent(1, 1000, header_link_failure_fn()); } -#[tokio::test(flavor = "current_thread")] -async fn header_link_failure_concurrent() { +#[test] +fn header_link_failure_concurrent() { execute_concurrent(100, 1000, header_link_failure_fn()); } @@ -119,12 +119,12 @@ fn header_check_memory_fn(parallel_tasks: usize, iterations: usize) { execute_concurrent(parallel_tasks, iterations, task_fun); } -#[tokio::test(flavor = "current_thread")] -async fn header_check_memory() { +#[test] +fn header_check_memory() { header_check_memory_fn(1, 1000); } -#[tokio::test(flavor = "current_thread")] -async fn header_check_memory_concurrent() { +#[test] +fn header_check_memory_concurrent() { header_check_memory_fn(100, 100); } diff --git a/commons/zenoh-shm/tests/posix_array.rs b/commons/zenoh-shm/tests/posix_array.rs index 0a9189ef00..83fdad88fb 100644 --- a/commons/zenoh-shm/tests/posix_array.rs +++ b/commons/zenoh-shm/tests/posix_array.rs @@ -110,18 +110,18 @@ where /// MEMORY CHECKS /// -#[tokio::test(flavor = "current_thread")] -async fn arr_u8_index_memory_test() { +#[test] +fn arr_u8_index_memory_test() { test_array::(); } -#[tokio::test(flavor = "current_thread")] -async fn arr_u16_index_memory_test() { +#[test] +fn arr_u16_index_memory_test() { test_array::(); } -#[tokio::test(flavor = "current_thread")] -async fn arr_u32_index_memory_test() { +#[test] +fn arr_u32_index_memory_test() { test_array::(); } @@ -145,17 +145,17 @@ where ); } -#[tokio::test(flavor = "current_thread")] -async fn arr_u8_index_invalid_elem_count() { +#[test] +fn arr_u8_index_invalid_elem_count() { test_invalid_elem_index::(); } -#[tokio::test(flavor = "current_thread")] -async fn arr_u16_index_invalid_elem_count() { +#[test] +fn arr_u16_index_invalid_elem_count() { test_invalid_elem_index::(); } -#[tokio::test(flavor = "current_thread")] -async fn arr_u32_index_invalid_elem_count() { +#[test] +fn arr_u32_index_invalid_elem_count() { test_invalid_elem_index::(); } diff --git a/commons/zenoh-shm/tests/posix_segment.rs b/commons/zenoh-shm/tests/posix_segment.rs index 847f06a442..879fccf298 100644 --- a/commons/zenoh-shm/tests/posix_segment.rs +++ b/commons/zenoh-shm/tests/posix_segment.rs @@ -66,62 +66,62 @@ where /// UNSIGNED /// -#[tokio::test(flavor = "current_thread")] -async fn segment_u8_id() { +#[test] +fn segment_u8_id() { test_segment::() } -#[tokio::test(flavor = "current_thread")] -async fn segment_u16_id() { +#[test] +fn segment_u16_id() { test_segment::() } -#[tokio::test(flavor = "current_thread")] -async fn segment_u32_id() { +#[test] +fn segment_u32_id() { test_segment::() } -#[tokio::test(flavor = "current_thread")] -async fn segment_u64_id() { +#[test] +fn segment_u64_id() { test_segment::() } -#[tokio::test(flavor = "current_thread")] -async fn segment_u128_id() { +#[test] +fn segment_u128_id() { test_segment::() } /// SIGNED /// -#[tokio::test(flavor = "current_thread")] -async fn segment_i8_id() { +#[test] +fn segment_i8_id() { test_segment::() } -#[tokio::test(flavor = "current_thread")] -async fn segment_i16_id() { +#[test] +fn segment_i16_id() { test_segment::() } -#[tokio::test(flavor = "current_thread")] -async fn segment_i32_id() { +#[test] +fn segment_i32_id() { test_segment::() } -#[tokio::test(flavor = "current_thread")] -async fn segment_i64_id() { +#[test] +fn segment_i64_id() { test_segment::() } -#[tokio::test(flavor = "current_thread")] -async fn segment_i128_id() { +#[test] +fn segment_i128_id() { test_segment::() } /// Behaviour checks /// -#[tokio::test(flavor = "current_thread")] -async fn segment_open() { +#[test] +fn segment_open() { let created_segment: Segment = Segment::create(900, TEST_SEGMENT_PREFIX).expect("error creating new segment"); @@ -129,8 +129,8 @@ async fn segment_open() { .expect("error opening existing segment!"); } -#[tokio::test(flavor = "current_thread")] -async fn segment_open_error() { +#[test] +fn segment_open_error() { let id = { let created_segment: Segment = Segment::create(900, TEST_SEGMENT_PREFIX).expect("error creating new segment"); diff --git a/commons/zenoh-shm/tests/posix_shm_provider.rs b/commons/zenoh-shm/tests/posix_shm_provider.rs index 5581250cc5..f0f7417340 100644 --- a/commons/zenoh-shm/tests/posix_shm_provider.rs +++ b/commons/zenoh-shm/tests/posix_shm_provider.rs @@ -27,8 +27,8 @@ use zenoh_shm::api::{ static BUFFER_NUM: usize = 100; static BUFFER_SIZE: usize = 1024; -#[tokio::test(flavor = "current_thread")] -async fn posix_shm_provider_create() { +#[test] +fn posix_shm_provider_create() { let _backend = PosixShmProviderBackend::builder() .with_size(1024) .expect("Error creating Layout!") @@ -36,8 +36,8 @@ async fn posix_shm_provider_create() { .expect("Error creating PosixShmProviderBackend!"); } -#[tokio::test(flavor = "current_thread")] -async fn posix_shm_provider_alloc() { +#[test] +fn posix_shm_provider_alloc() { let backend = PosixShmProviderBackend::builder() .with_size(1024) .expect("Error creating Layout!") @@ -51,8 +51,8 @@ async fn posix_shm_provider_alloc() { .expect("PosixShmProviderBackend: error allocating buffer"); } -#[tokio::test(flavor = "current_thread")] -async fn posix_shm_provider_open() { +#[test] +fn posix_shm_provider_open() { let backend = PosixShmProviderBackend::builder() .with_size(1024) .expect("Error creating Layout!") @@ -72,8 +72,8 @@ async fn posix_shm_provider_open() { .expect("Error attaching to segment"); } -#[tokio::test(flavor = "current_thread")] -async fn posix_shm_provider_allocator() { +#[test] +fn posix_shm_provider_allocator() { let backend = PosixShmProviderBackend::builder() .with_size(BUFFER_SIZE * BUFFER_NUM) .expect("Error creating Layout!") diff --git a/commons/zenoh-shm/tests/watchdog.rs b/commons/zenoh-shm/tests/watchdog.rs index 5bb2476821..58633a9d4e 100644 --- a/commons/zenoh-shm/tests/watchdog.rs +++ b/commons/zenoh-shm/tests/watchdog.rs @@ -35,13 +35,13 @@ fn watchdog_alloc_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Send + } } -#[tokio::test(flavor = "current_thread")] -async fn watchdog_alloc() { +#[test] +fn watchdog_alloc() { execute_concurrent(1, 10000, watchdog_alloc_fn()); } -#[tokio::test(flavor = "current_thread")] -async fn watchdog_alloc_concurrent() { +#[test] +fn watchdog_alloc_concurrent() { execute_concurrent(1000, 10000, watchdog_alloc_fn()); } @@ -63,14 +63,14 @@ fn watchdog_confirmed_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Sen } #[ignore] -#[tokio::test(flavor = "current_thread")] -async fn watchdog_confirmed() { +#[test] +fn watchdog_confirmed() { execute_concurrent(1, 10, watchdog_confirmed_fn()); } #[ignore] -#[tokio::test(flavor = "current_thread")] -async fn watchdog_confirmed_concurrent() { +#[test] +fn watchdog_confirmed_concurrent() { execute_concurrent(1000, 10, watchdog_confirmed_fn()); } @@ -78,8 +78,8 @@ async fn watchdog_confirmed_concurrent() { // other watchdog instance from other test running in the same process and changes it's behaviour, // so we cannot run dangling test in parallel with anything else #[ignore] -#[tokio::test(flavor = "current_thread")] -async fn watchdog_confirmed_dangling() { +#[test] +fn watchdog_confirmed_dangling() { let allocated = GLOBAL_STORAGE .read() .allocate_watchdog() @@ -137,14 +137,14 @@ fn watchdog_validated_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Sen } #[ignore] -#[tokio::test(flavor = "current_thread")] -async fn watchdog_validated() { +#[test] +fn watchdog_validated() { execute_concurrent(1, 10, watchdog_validated_fn()); } #[ignore] -#[tokio::test(flavor = "current_thread")] -async fn watchdog_validated_concurrent() { +#[test] +fn watchdog_validated_concurrent() { execute_concurrent(1000, 10, watchdog_validated_fn()); } @@ -177,14 +177,14 @@ fn watchdog_validated_invalid_without_confirmator_fn( } #[ignore] -#[tokio::test(flavor = "current_thread")] -async fn watchdog_validated_invalid_without_confirmator() { +#[test] +fn watchdog_validated_invalid_without_confirmator() { execute_concurrent(1, 10, watchdog_validated_invalid_without_confirmator_fn()); } #[ignore] -#[tokio::test(flavor = "current_thread")] -async fn watchdog_validated_invalid_without_confirmator_concurrent() { +#[test] +fn watchdog_validated_invalid_without_confirmator_concurrent() { execute_concurrent( 1000, 10, @@ -242,14 +242,14 @@ fn watchdog_validated_additional_confirmation_fn( } #[ignore] -#[tokio::test(flavor = "current_thread")] -async fn watchdog_validated_additional_confirmation() { +#[test] +fn watchdog_validated_additional_confirmation() { execute_concurrent(1, 10, watchdog_validated_additional_confirmation_fn()); } #[ignore] -#[tokio::test(flavor = "current_thread")] -async fn watchdog_validated_additional_confirmation_concurrent() { +#[test] +fn watchdog_validated_additional_confirmation_concurrent() { execute_concurrent(1000, 10, watchdog_validated_additional_confirmation_fn()); } @@ -297,22 +297,22 @@ fn watchdog_validated_overloaded_system_fn( } #[ignore] -#[tokio::test(flavor = "current_thread")] -async fn watchdog_validated_low_load() { +#[test] +fn watchdog_validated_low_load() { let _load = CpuLoad::low(); execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn()); } #[ignore] -#[tokio::test(flavor = "current_thread")] -async fn watchdog_validated_high_load() { +#[test] +fn watchdog_validated_high_load() { let _load = CpuLoad::optimal_high(); execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn()); } #[ignore] -#[tokio::test(flavor = "current_thread")] -async fn watchdog_validated_overloaded_system() { +#[test] +fn watchdog_validated_overloaded_system() { let _load = CpuLoad::excessive(); execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn()); } From ae06310df5795e67de4f2547c773bf3f1b1bc520 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Sun, 15 Sep 2024 13:24:41 +0300 Subject: [PATCH 07/12] get rid of signal_hook_tokio as it doesn't support win --- Cargo.lock | 14 -------------- Cargo.toml | 1 - commons/zenoh-shm/Cargo.toml | 2 -- commons/zenoh-shm/src/cleanup.rs | 14 ++++++-------- 4 files changed, 6 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 836933c2d0..221c5280a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3885,18 +3885,6 @@ dependencies = [ "libc", ] -[[package]] -name = "signal-hook-tokio" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "213241f76fb1e37e27de3b6aa1b068a2c333233b59cca6634f634b80a27ecf1e" -dependencies = [ - "futures-core", - "libc", - "signal-hook", - "tokio", -] - [[package]] name = "signature" version = "2.1.0" @@ -5872,7 +5860,6 @@ dependencies = [ "rand 0.8.5", "shared_memory", "signal-hook", - "signal-hook-tokio", "stabby", "static_init", "thread-priority", @@ -5882,7 +5869,6 @@ dependencies = [ "zenoh-core", "zenoh-macros", "zenoh-result", - "zenoh-runtime", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index bf98508af7..54a25e4c81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -163,7 +163,6 @@ sha3 = "0.10.8" shared_memory = "0.12.4" shellexpand = "3.1.0" signal-hook = { version = "0.3.17", default-features = false } -signal-hook-tokio = { version = "0.3.1", default-features = false } socket2 = { version = "0.5.7", features = ["all"] } stop-token = "0.7.0" syn = "2.0" diff --git a/commons/zenoh-shm/Cargo.toml b/commons/zenoh-shm/Cargo.toml index e2b390a696..ba219bc034 100644 --- a/commons/zenoh-shm/Cargo.toml +++ b/commons/zenoh-shm/Cargo.toml @@ -41,7 +41,6 @@ zenoh-result = { workspace = true } zenoh-core = { workspace = true } zenoh-macros = { workspace = true } zenoh-buffers = { workspace = true } -zenoh-runtime = { workspace = true } rand = { workspace = true } static_init = { workspace = true } num-traits = { workspace = true } @@ -50,7 +49,6 @@ thread-priority = { workspace = true } lockfree = { workspace = true } stabby = { workspace = true } signal-hook = { workspace = true } -signal-hook-tokio = { workspace = true, features = ["futures-v0_3"] } futures = { workspace = true } [dev-dependencies] diff --git a/commons/zenoh-shm/src/cleanup.rs b/commons/zenoh-shm/src/cleanup.rs index 139bbdc9f1..c86106107f 100644 --- a/commons/zenoh-shm/src/cleanup.rs +++ b/commons/zenoh-shm/src/cleanup.rs @@ -12,9 +12,7 @@ // ZettaScale Zenoh Team, // -use futures::stream::StreamExt; use signal_hook::consts::signal::*; -use signal_hook_tokio::Signals; use static_init::dynamic; /// A global cleanup, that is guaranteed to be dropped at normal program exit and that will @@ -31,13 +29,13 @@ impl Cleanup { fn new() -> Self { // todo: this is a workaround to make sure Cleanup will be executed even if process terminates via signal handlers // that execute std::terminate instead of exit - zenoh_runtime::ZRuntime::Acceptor.spawn(async { - let signals = Signals::new([SIGHUP, SIGTERM, SIGINT, SIGQUIT]).unwrap(); - let mut signals = signals.fuse(); - if let Some(_signal) = signals.next().await { - std::process::exit(0); + for signal in [SIGHUP, SIGTERM, SIGINT, SIGQUIT] { + unsafe { + let _ = signal_hook::low_level::register(signal, || { + std::process::exit(0); + }); } - }); + } Self { cleanups: Default::default(), From a560954d8c8d4ce5e2acd19fab4a184cc4fcb789 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Sun, 15 Sep 2024 14:11:04 +0300 Subject: [PATCH 08/12] - remove unnecessary dep - fix windows compilation --- Cargo.lock | 1 - commons/zenoh-shm/Cargo.toml | 1 - commons/zenoh-shm/src/cleanup.rs | 8 +++++++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 221c5280a7..3ebe1688e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5852,7 +5852,6 @@ version = "1.0.0-dev" dependencies = [ "async-trait", "crc", - "futures", "libc", "lockfree", "num-traits", diff --git a/commons/zenoh-shm/Cargo.toml b/commons/zenoh-shm/Cargo.toml index ba219bc034..c5b2a0a628 100644 --- a/commons/zenoh-shm/Cargo.toml +++ b/commons/zenoh-shm/Cargo.toml @@ -49,7 +49,6 @@ thread-priority = { workspace = true } lockfree = { workspace = true } stabby = { workspace = true } signal-hook = { workspace = true } -futures = { workspace = true } [dev-dependencies] libc = { workspace = true } diff --git a/commons/zenoh-shm/src/cleanup.rs b/commons/zenoh-shm/src/cleanup.rs index c86106107f..0b22d2b1c2 100644 --- a/commons/zenoh-shm/src/cleanup.rs +++ b/commons/zenoh-shm/src/cleanup.rs @@ -29,7 +29,13 @@ impl Cleanup { fn new() -> Self { // todo: this is a workaround to make sure Cleanup will be executed even if process terminates via signal handlers // that execute std::terminate instead of exit - for signal in [SIGHUP, SIGTERM, SIGINT, SIGQUIT] { + for signal in [ + SIGHUP, + SIGTERM, + SIGINT, + #[cfg(not(target_os = "windows"))] + SIGQUIT, + ] { unsafe { let _ = signal_hook::low_level::register(signal, || { std::process::exit(0); From 082148309d73af04df07aa78b6117576940df5dd Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Sun, 15 Sep 2024 14:22:02 +0300 Subject: [PATCH 09/12] - There is no SIGHUP mapping on win --- commons/zenoh-shm/src/cleanup.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/commons/zenoh-shm/src/cleanup.rs b/commons/zenoh-shm/src/cleanup.rs index 0b22d2b1c2..ac29dfe08f 100644 --- a/commons/zenoh-shm/src/cleanup.rs +++ b/commons/zenoh-shm/src/cleanup.rs @@ -30,6 +30,7 @@ impl Cleanup { // todo: this is a workaround to make sure Cleanup will be executed even if process terminates via signal handlers // that execute std::terminate instead of exit for signal in [ + #[cfg(not(target_os = "windows"))] SIGHUP, SIGTERM, SIGINT, From e7ec4f64fb9a1743ee40e1a6fb9b5a9488d5a091 Mon Sep 17 00:00:00 2001 From: yellowhatter <104833606+yellowhatter@users.noreply.github.com> Date: Tue, 17 Sep 2024 11:20:04 +0300 Subject: [PATCH 10/12] [skip ci] doc comment update Co-authored-by: Luca Cominardi --- commons/zenoh-shm/src/api/cleanup.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commons/zenoh-shm/src/api/cleanup.rs b/commons/zenoh-shm/src/api/cleanup.rs index df4e49dd6e..db6de74046 100644 --- a/commons/zenoh-shm/src/api/cleanup.rs +++ b/commons/zenoh-shm/src/api/cleanup.rs @@ -15,7 +15,7 @@ use crate::cleanup::CLEANUP; /// Make forced cleanup -/// NOTE: this is a part of ugly on-exit-cleanup workaround and will be removed +/// NOTE: this is a part of a temporary on-exit-cleanup workaround and it will be very likely removed in the future. /// In order to properly cleanup some SHM internals upon process exit, Zenoh installs exit handlers (see atexit() API). /// The bad thing is that atexit handler is executed only on process exit(), the terminating signal handlers (like SIGINT) /// bypass it and terminate the process without cleanup. To eliminate this effect, Zenoh overrides SIGHUP, SIGTERM, SIGINT From f6580cfbaf97efb53a413548267932ff14faf108 Mon Sep 17 00:00:00 2001 From: yellowhatter <104833606+yellowhatter@users.noreply.github.com> Date: Tue, 17 Sep 2024 11:20:24 +0300 Subject: [PATCH 11/12] [skip ci] doc comment update Co-authored-by: Luca Cominardi --- commons/zenoh-shm/src/api/cleanup.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commons/zenoh-shm/src/api/cleanup.rs b/commons/zenoh-shm/src/api/cleanup.rs index db6de74046..9b2fa2c50f 100644 --- a/commons/zenoh-shm/src/api/cleanup.rs +++ b/commons/zenoh-shm/src/api/cleanup.rs @@ -17,7 +17,7 @@ use crate::cleanup::CLEANUP; /// Make forced cleanup /// NOTE: this is a part of a temporary on-exit-cleanup workaround and it will be very likely removed in the future. /// In order to properly cleanup some SHM internals upon process exit, Zenoh installs exit handlers (see atexit() API). -/// The bad thing is that atexit handler is executed only on process exit(), the terminating signal handlers (like SIGINT) +/// The atexit handler is executed only on process exit(), the inconvenience is that terminating signal handlers (like SIGINT) /// bypass it and terminate the process without cleanup. To eliminate this effect, Zenoh overrides SIGHUP, SIGTERM, SIGINT /// and SIGQUIT handlers and calls exit() inside to make graceful shutdown. If user is going to override these Zenoh's handlers, /// the workaround will break, and there are two ways to keep this workaround working: From 855a1377e6792b4bb5435fd1f91f4d46a84ec0b0 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Tue, 17 Sep 2024 11:44:03 +0300 Subject: [PATCH 12/12] Review fixes --- commons/zenoh-shm/src/api/cleanup.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/commons/zenoh-shm/src/api/cleanup.rs b/commons/zenoh-shm/src/api/cleanup.rs index 9b2fa2c50f..dda3637a07 100644 --- a/commons/zenoh-shm/src/api/cleanup.rs +++ b/commons/zenoh-shm/src/api/cleanup.rs @@ -16,14 +16,20 @@ use crate::cleanup::CLEANUP; /// Make forced cleanup /// NOTE: this is a part of a temporary on-exit-cleanup workaround and it will be very likely removed in the future. +/// WARN: The improper usage can break the application logic, impacting SHM-utilizing Sessions in other processes. +/// Cleanup unlinks SHM segments _created_ by current process from filesystem with the following consequences: +/// - Sessions that are not linked to this segment will fail to link it if they try. Such kind of errors are properly handled. +/// - Already linked processes will still have this shared memory mapped and safely accessible +/// - The actual memory will be reclaimed by the OS only after last process using it will close it or exit +/// /// In order to properly cleanup some SHM internals upon process exit, Zenoh installs exit handlers (see atexit() API). -/// The atexit handler is executed only on process exit(), the inconvenience is that terminating signal handlers (like SIGINT) -/// bypass it and terminate the process without cleanup. To eliminate this effect, Zenoh overrides SIGHUP, SIGTERM, SIGINT -/// and SIGQUIT handlers and calls exit() inside to make graceful shutdown. If user is going to override these Zenoh's handlers, -/// the workaround will break, and there are two ways to keep this workaround working: +/// The atexit handler is executed only on process exit(), the inconvenience is that terminating signal handlers +/// (like SIGINT) bypass it and terminate the process without cleanup. To eliminate this effect, Zenoh overrides +/// SIGHUP, SIGTERM, SIGINT and SIGQUIT handlers and calls exit() inside to make graceful shutdown. If user is going to +/// override these Zenoh's handlers, the workaround will break, and there are two ways to keep this workaround working: /// - execute overridden Zenoh handlers in overriding handler code -/// - call forced_cleanup() anywhere at any time before terminating the process +/// - call force_cleanup_before_exit() anywhere at any time before terminating the process #[zenoh_macros::unstable_doc] -pub fn forced_cleanup() { +pub fn force_cleanup_before_exit() { CLEANUP.read().cleanup(); }