diff --git a/Cargo.lock b/Cargo.lock index 7c140c1c31..3ebe1688e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5858,6 +5858,7 @@ dependencies = [ "num_cpus", "rand 0.8.5", "shared_memory", + "signal-hook", "stabby", "static_init", "thread-priority", diff --git a/Cargo.toml b/Cargo.toml index cb836bb14a..54a25e4c81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -162,6 +162,7 @@ stabby = "36.1.1" sha3 = "0.10.8" shared_memory = "0.12.4" shellexpand = "3.1.0" +signal-hook = { version = "0.3.17", default-features = false } socket2 = { version = "0.5.7", features = ["all"] } stop-token = "0.7.0" syn = "2.0" diff --git a/commons/zenoh-shm/Cargo.toml b/commons/zenoh-shm/Cargo.toml index 0b79fd2549..c5b2a0a628 100644 --- a/commons/zenoh-shm/Cargo.toml +++ b/commons/zenoh-shm/Cargo.toml @@ -48,6 +48,7 @@ num_cpus = { workspace = true, optional = true } thread-priority = { workspace = true } lockfree = { workspace = true } stabby = { workspace = true } +signal-hook = { workspace = true } [dev-dependencies] libc = { workspace = true } diff --git a/commons/zenoh-shm/src/api/cleanup.rs b/commons/zenoh-shm/src/api/cleanup.rs new file mode 100644 index 0000000000..dda3637a07 --- /dev/null +++ b/commons/zenoh-shm/src/api/cleanup.rs @@ -0,0 +1,35 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::cleanup::CLEANUP; + +/// Make forced cleanup +/// NOTE: this is a part of a temporary on-exit-cleanup workaround and it will be very likely removed in the future. +/// WARN: The improper usage can break the application logic, impacting SHM-utilizing Sessions in other processes. +/// Cleanup unlinks SHM segments _created_ by current process from filesystem with the following consequences: +/// - Sessions that are not linked to this segment will fail to link it if they try. Such kind of errors are properly handled. +/// - Already linked processes will still have this shared memory mapped and safely accessible +/// - The actual memory will be reclaimed by the OS only after last process using it will close it or exit +/// +/// In order to properly cleanup some SHM internals upon process exit, Zenoh installs exit handlers (see atexit() API). +/// The atexit handler is executed only on process exit(), the inconvenience is that terminating signal handlers +/// (like SIGINT) bypass it and terminate the process without cleanup. To eliminate this effect, Zenoh overrides +/// SIGHUP, SIGTERM, SIGINT and SIGQUIT handlers and calls exit() inside to make graceful shutdown. If user is going to +/// override these Zenoh's handlers, the workaround will break, and there are two ways to keep this workaround working: +/// - execute overridden Zenoh handlers in overriding handler code +/// - call force_cleanup_before_exit() anywhere at any time before terminating the process +#[zenoh_macros::unstable_doc] +pub fn force_cleanup_before_exit() { + CLEANUP.read().cleanup(); +} diff --git a/commons/zenoh-shm/src/api/mod.rs b/commons/zenoh-shm/src/api/mod.rs index a87188da29..8802b1eb58 100644 --- a/commons/zenoh-shm/src/api/mod.rs +++ b/commons/zenoh-shm/src/api/mod.rs @@ -13,6 +13,7 @@ // pub mod buffer; +pub mod cleanup; pub mod client; pub mod client_storage; pub mod common; diff --git a/commons/zenoh-shm/src/cleanup.rs b/commons/zenoh-shm/src/cleanup.rs index 5649732bf6..ac29dfe08f 100644 --- a/commons/zenoh-shm/src/cleanup.rs +++ b/commons/zenoh-shm/src/cleanup.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // +use signal_hook::consts::signal::*; use static_init::dynamic; /// A global cleanup, that is guaranteed to be dropped at normal program exit and that will @@ -26,11 +27,36 @@ pub(crate) struct Cleanup { impl Cleanup { fn new() -> Self { + // todo: this is a workaround to make sure Cleanup will be executed even if process terminates via signal handlers + // that execute std::terminate instead of exit + for signal in [ + #[cfg(not(target_os = "windows"))] + SIGHUP, + SIGTERM, + SIGINT, + #[cfg(not(target_os = "windows"))] + SIGQUIT, + ] { + unsafe { + let _ = signal_hook::low_level::register(signal, || { + std::process::exit(0); + }); + } + } + Self { cleanups: Default::default(), } } + pub(crate) fn cleanup(&self) { + while let Some(cleanup) = self.cleanups.pop() { + if let Some(f) = cleanup { + f(); + } + } + } + pub(crate) fn register_cleanup(&self, cleanup_fn: Box) { self.cleanups.push(Some(cleanup_fn)); } @@ -38,10 +64,6 @@ impl Cleanup { impl Drop for Cleanup { fn drop(&mut self) { - while let Some(cleanup) = self.cleanups.pop() { - if let Some(f) = cleanup { - f(); - } - } + self.cleanup(); } } diff --git a/commons/zenoh-shm/tests/watchdog.rs b/commons/zenoh-shm/tests/watchdog.rs index bc4a75dfa9..58633a9d4e 100644 --- a/commons/zenoh-shm/tests/watchdog.rs +++ b/commons/zenoh-shm/tests/watchdog.rs @@ -62,14 +62,14 @@ fn watchdog_confirmed_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Sen } } -#[test] #[ignore] +#[test] fn watchdog_confirmed() { execute_concurrent(1, 10, watchdog_confirmed_fn()); } -#[test] #[ignore] +#[test] fn watchdog_confirmed_concurrent() { execute_concurrent(1000, 10, watchdog_confirmed_fn()); } @@ -77,8 +77,8 @@ fn watchdog_confirmed_concurrent() { // TODO: confirmation to dangling watchdog actually writes to potentially-existing // other watchdog instance from other test running in the same process and changes it's behaviour, // so we cannot run dangling test in parallel with anything else -#[test] #[ignore] +#[test] fn watchdog_confirmed_dangling() { let allocated = GLOBAL_STORAGE .read() @@ -136,14 +136,14 @@ fn watchdog_validated_fn() -> impl Fn(usize, usize) -> ZResult<()> + Clone + Sen } } -#[test] #[ignore] +#[test] fn watchdog_validated() { execute_concurrent(1, 10, watchdog_validated_fn()); } -#[test] #[ignore] +#[test] fn watchdog_validated_concurrent() { execute_concurrent(1000, 10, watchdog_validated_fn()); } @@ -176,14 +176,14 @@ fn watchdog_validated_invalid_without_confirmator_fn( } } -#[test] #[ignore] +#[test] fn watchdog_validated_invalid_without_confirmator() { execute_concurrent(1, 10, watchdog_validated_invalid_without_confirmator_fn()); } -#[test] #[ignore] +#[test] fn watchdog_validated_invalid_without_confirmator_concurrent() { execute_concurrent( 1000, @@ -241,14 +241,14 @@ fn watchdog_validated_additional_confirmation_fn( } } -#[test] #[ignore] +#[test] fn watchdog_validated_additional_confirmation() { execute_concurrent(1, 10, watchdog_validated_additional_confirmation_fn()); } -#[test] #[ignore] +#[test] fn watchdog_validated_additional_confirmation_concurrent() { execute_concurrent(1000, 10, watchdog_validated_additional_confirmation_fn()); } @@ -296,22 +296,22 @@ fn watchdog_validated_overloaded_system_fn( } } -#[test] #[ignore] +#[test] fn watchdog_validated_low_load() { let _load = CpuLoad::low(); execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn()); } -#[test] #[ignore] +#[test] fn watchdog_validated_high_load() { let _load = CpuLoad::optimal_high(); execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn()); } -#[test] #[ignore] +#[test] fn watchdog_validated_overloaded_system() { let _load = CpuLoad::excessive(); execute_concurrent(1000, 10, watchdog_validated_overloaded_system_fn());