diff --git a/Cargo.lock b/Cargo.lock index 51400ce6f..b6c5bda4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,7 +164,7 @@ dependencies = [ "async-task", "concurrent-queue", "fastrand 2.1.1", - "futures-lite 2.3.0", + "futures-lite 2.5.0", "slab", ] @@ -176,10 +176,10 @@ checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" dependencies = [ "async-channel 2.3.1", "async-executor", - "async-io 2.3.4", + "async-io 2.4.0", "async-lock 3.4.0", "blocking", - "futures-lite 2.3.0", + "futures-lite 2.5.0", "once_cell", ] @@ -205,17 +205,17 @@ dependencies = [ [[package]] name = "async-io" -version = "2.3.4" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "444b0228950ee6501b3568d3c93bf1176a1fdbc3b758dcd9475046d30f4dc7e8" +checksum = "43a2b323ccce0a1d90b449fd71f2a06ca7faa7c54c2751f06c9bd851fc061059" dependencies = [ "async-lock 3.4.0", "cfg-if", "concurrent-queue", "futures-io", - "futures-lite 2.3.0", + "futures-lite 2.5.0", "parking", - "polling 3.7.3", + "polling 3.7.4", "rustix 0.38.37", "slab", "tracing", @@ -383,7 +383,7 @@ dependencies = [ "async-channel 2.3.1", "async-task", "futures-io", - "futures-lite 2.3.0", + "futures-lite 2.5.0", "piper", ] @@ -923,9 +923,9 @@ dependencies = [ [[package]] name = "futures-lite" -version = "2.3.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" +checksum = "cef40d21ae2c515b51041df9ed313ed21e572df340ea58a922a0aefe7e8891a1" dependencies = [ "fastrand 2.1.1", "futures-core", @@ -1957,9 +1957,9 @@ dependencies = [ [[package]] name = "polling" -version = "3.7.3" +version = "3.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc2790cd301dec6cd3b7a025e4815cf825724a51c98dccfe6a3e55f05ffb6511" +checksum = "a604568c3202727d1507653cb121dbd627a58684eb09a820fd746bee38b4442f" dependencies = [ "cfg-if", "concurrent-queue", @@ -3302,9 +3302,9 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" [[package]] name = "value-bag" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a84c137d37ab0142f0f2ddfe332651fdbf252e7b7dbb4e67b6c1f1b2e925101" +checksum = "3ef4c4aa54d5d05a279399bfa921ec387b7aba77caf7a682ae8d86785b8fdad2" [[package]] name = "vec_map" @@ -3718,7 +3718,7 @@ dependencies = [ [[package]] name = "zenoh" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "ahash", "async-trait", @@ -3765,7 +3765,7 @@ dependencies = [ [[package]] name = "zenoh-buffers" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "zenoh-collections", ] @@ -3790,6 +3790,7 @@ dependencies = [ "regex", "serde_yaml", "spin", + "tokio", "tracing", "unwrap-infallible", "zenoh", @@ -3801,7 +3802,7 @@ dependencies = [ [[package]] name = "zenoh-codec" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "tracing", "uhlc", @@ -3813,12 +3814,12 @@ dependencies = [ [[package]] name = "zenoh-collections" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" [[package]] name = "zenoh-config" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "json5", "num_cpus", @@ -3839,7 +3840,7 @@ dependencies = [ [[package]] name = "zenoh-core" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "lazy_static", "tokio", @@ -3850,7 +3851,7 @@ dependencies = [ [[package]] name = "zenoh-crypto" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "aes", "hmac", @@ -3863,7 +3864,7 @@ dependencies = [ [[package]] name = "zenoh-ext" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "bincode", "flume", @@ -3880,7 +3881,7 @@ dependencies = [ [[package]] name = "zenoh-keyexpr" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "hashbrown 0.14.5", "keyed-set", @@ -3894,7 +3895,7 @@ dependencies = [ [[package]] name = "zenoh-link" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "zenoh-config", "zenoh-link-commons", @@ -3914,7 +3915,7 @@ dependencies = [ [[package]] name = "zenoh-link-commons" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "async-trait", "flume", @@ -3938,7 +3939,7 @@ dependencies = [ [[package]] name = "zenoh-link-quic" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "async-trait", "base64 0.22.1", @@ -3964,7 +3965,7 @@ dependencies = [ [[package]] name = "zenoh-link-serial" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "async-trait", "tokio", @@ -3982,7 +3983,7 @@ dependencies = [ [[package]] name = "zenoh-link-tcp" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "async-trait", "socket2 0.5.7", @@ -3999,7 +4000,7 @@ dependencies = [ [[package]] name = "zenoh-link-tls" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "async-trait", "base64 0.22.1", @@ -4028,7 +4029,7 @@ dependencies = [ [[package]] name = "zenoh-link-udp" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "async-trait", "socket2 0.5.7", @@ -4047,7 +4048,7 @@ dependencies = [ [[package]] name = "zenoh-link-unixpipe" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "advisory-lock", "async-trait", @@ -4069,7 +4070,7 @@ dependencies = [ [[package]] name = "zenoh-link-unixsock_stream" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "async-trait", "nix 0.29.0", @@ -4087,7 +4088,7 @@ dependencies = [ [[package]] name = "zenoh-link-vsock" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "async-trait", "libc", @@ -4105,7 +4106,7 @@ dependencies = [ [[package]] name = "zenoh-link-ws" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "async-trait", "futures-util", @@ -4125,7 +4126,7 @@ dependencies = [ [[package]] name = "zenoh-macros" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "proc-macro2", "quote", @@ -4136,7 +4137,7 @@ dependencies = [ [[package]] name = "zenoh-plugin-trait" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "git-version", "libloading", @@ -4152,7 +4153,7 @@ dependencies = [ [[package]] name = "zenoh-protocol" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "const_format", "rand", @@ -4166,7 +4167,7 @@ dependencies = [ [[package]] name = "zenoh-result" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "anyhow", ] @@ -4174,7 +4175,7 @@ dependencies = [ [[package]] name = "zenoh-runtime" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "lazy_static", "ron", @@ -4187,7 +4188,7 @@ dependencies = [ [[package]] name = "zenoh-shm" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "advisory-lock", "async-trait", @@ -4210,7 +4211,7 @@ dependencies = [ [[package]] name = "zenoh-sync" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "event-listener 5.3.1", "futures", @@ -4223,7 +4224,7 @@ dependencies = [ [[package]] name = "zenoh-task" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "futures", "tokio", @@ -4236,7 +4237,7 @@ dependencies = [ [[package]] name = "zenoh-transport" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "async-trait", "crossbeam-utils", @@ -4270,7 +4271,7 @@ dependencies = [ [[package]] name = "zenoh-util" version = "1.0.0-dev" -source = "git+https://github.com/eclipse-zenoh/zenoh.git?branch=main#9a73585f60039a469c8230193c4792d43116f663" +source = "git+https://github.com/ZettaScaleLabs/zenoh.git?branch=close_builder#f0f378e7e20b10564fa8b696d6f5f7fa28e969d9" dependencies = [ "async-trait", "const_format", diff --git a/Cargo.toml b/Cargo.toml index ac8cff592..452c80b0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,12 +71,13 @@ libc = "0.2.139" tracing = "0.1" rand = "0.8.5" spin = "0.9.5" +tokio = "*" unwrap-infallible = "0.1.5" const_format = "0.2.32" -zenoh = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false, features = ["internal"] } -zenoh-ext = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", features=["internal"], branch = "main" } -zenoh-runtime = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main" } -zenoh-util = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main" } +zenoh = { version = "1.0.0-dev", git = "https://github.com/ZettaScaleLabs/zenoh.git", branch = "close_builder", default-features = false, features = ["internal"] } +zenoh-ext = { version = "1.0.0-dev", git = "https://github.com/ZettaScaleLabs/zenoh.git", features=["internal"], branch = "close_builder" } +zenoh-runtime = { version = "1.0.0-dev", git = "https://github.com/ZettaScaleLabs/zenoh.git", branch = "close_builder" } +zenoh-util = { version = "1.0.0-dev", git = "https://github.com/ZettaScaleLabs/zenoh.git", branch = "close_builder" } flume = "*" [build-dependencies] diff --git a/Cargo.toml.in b/Cargo.toml.in index 12bc3b3a6..b975f1bab 100644 --- a/Cargo.toml.in +++ b/Cargo.toml.in @@ -71,12 +71,13 @@ libc = "0.2.139" tracing = "0.1" rand = "0.8.5" spin = "0.9.5" +tokio = "*" unwrap-infallible = "0.1.5" const_format = "0.2.32" -zenoh = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false, features = ["internal"] } -zenoh-ext = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", features=["internal"], branch = "main" } -zenoh-runtime = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main" } -zenoh-util = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main" } +zenoh = { version = "1.0.0-dev", git = "https://github.com/ZettaScaleLabs/zenoh.git", branch = "close_builder", default-features = false, features = ["internal"] } +zenoh-ext = { version = "1.0.0-dev", git = "https://github.com/ZettaScaleLabs/zenoh.git", features=["internal"], branch = "close_builder" } +zenoh-runtime = { version = "1.0.0-dev", git = "https://github.com/ZettaScaleLabs/zenoh.git", branch = "close_builder" } +zenoh-util = { version = "1.0.0-dev", git = "https://github.com/ZettaScaleLabs/zenoh.git", branch = "close_builder" } flume = "*" [build-dependencies] diff --git a/build-resources/opaque-types/Cargo.lock b/build-resources/opaque-types/Cargo.lock index 5d16236f4..52395c560 100644 --- a/build-resources/opaque-types/Cargo.lock +++ b/build-resources/opaque-types/Cargo.lock @@ -1248,6 +1248,7 @@ version = "0.1.0" dependencies = [ "const_format", "flume", + "tokio", "zenoh", "zenoh-ext", "zenoh-protocol", diff --git a/build-resources/opaque-types/Cargo.toml b/build-resources/opaque-types/Cargo.toml index 1b70b8d2f..7ccb57ed0 100644 --- a/build-resources/opaque-types/Cargo.toml +++ b/build-resources/opaque-types/Cargo.toml @@ -35,3 +35,4 @@ zenoh-ext = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zen zenoh-protocol = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main" } const_format = "0.2.32" flume = "*" +tokio = "*" diff --git a/build-resources/opaque-types/src/lib.rs b/build-resources/opaque-types/src/lib.rs index d41d0689c..e5f958675 100644 --- a/build-resources/opaque-types/src/lib.rs +++ b/build-resources/opaque-types/src/lib.rs @@ -193,6 +193,10 @@ get_opaque_type_data!(Option, z_owned_session_t); /// A loaned Zenoh session. get_opaque_type_data!(Session, z_loaned_session_t); +#[cfg(feature = "unstable")] +/// An owned Close handle +get_opaque_type_data!(Option>>, zc_owned_concurrent_close_handle_t); + /// An owned Zenoh configuration. get_opaque_type_data!(Option, z_owned_config_t); /// A loaned Zenoh configuration. diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 41de044ba..e77821ef4 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -279,7 +279,23 @@ typedef struct z_clock_t { * Options passed to the `z_close()` function. */ typedef struct z_close_options_t { +#if defined(Z_FEATURE_UNSTABLE_API) + /** + * The timeout for close operation in milliseconds. 0 means default close timeout which is 10 seconds. + */ + uint32_t internal_timeout_ms; +#endif +#if defined(Z_FEATURE_UNSTABLE_API) + /** + * An optional uninitialized concurrent close handle. If set, the close operation will be executed + * concurrently in separate task, and this handle will be initialized to be used for controlling + * it's execution. + */ + struct zc_owned_concurrent_close_handle_t *internal_out_concurrent; +#endif +#if !defined(Z_FEATURE_UNSTABLE_API) uint8_t _dummy; +#endif } z_close_options_t; /** * @brief A hello message-processing closure. @@ -977,6 +993,9 @@ typedef struct zc_moved_closure_matching_status_t { struct zc_owned_closure_matching_status_t _this; } zc_moved_closure_matching_status_t; #endif +typedef struct zc_moved_concurrent_close_handle_t { + struct zc_owned_concurrent_close_handle_t _this; +} zc_moved_concurrent_close_handle_t; typedef struct zc_moved_matching_listener_t { struct zc_owned_matching_listener_t _this; } zc_moved_matching_listener_t; @@ -1509,7 +1528,7 @@ ZENOHC_API struct z_clock_t z_clock_now(void); */ ZENOHC_API z_result_t z_close(struct z_loaned_session_t *session, - const struct z_close_options_t *_options); + struct z_close_options_t *options); /** * Constructs the default value for `z_close_options_t`. */ @@ -4839,6 +4858,19 @@ void zc_closure_matching_status_drop(struct zc_moved_closure_matching_status_t * ZENOHC_API const struct zc_loaned_closure_matching_status_t *zc_closure_matching_status_loan(const struct zc_owned_closure_matching_status_t *closure); #endif +/** + * @brief Drops the close handle. The concurrent close task will not be interrupted. + */ +#if defined(Z_FEATURE_UNSTABLE_API) +ZENOHC_API void zc_concurrent_close_handle_drop(struct zc_moved_concurrent_close_handle_t *this_); +#endif +/** + * @brief Blocking wait on close handle to complete. Returns `Z_EIO` if close finishes with error. + */ +#if defined(Z_FEATURE_UNSTABLE_API) +ZENOHC_API +z_result_t zc_concurrent_close_handle_wait(struct zc_moved_concurrent_close_handle_t *handle); +#endif /** * Constructs a configuration by parsing a file path stored in ZENOH_CONFIG environmental variable. * @@ -4949,6 +4981,20 @@ bool zc_internal_closure_matching_status_check(const struct zc_owned_closure_mat ZENOHC_API void zc_internal_closure_matching_status_null(struct zc_owned_closure_matching_status_t *this_); #endif +/** + * @brief Returns ``true`` if concurrent close handle is valid, ``false`` if it is in gravestone state. + */ +#if defined(Z_FEATURE_UNSTABLE_API) +ZENOHC_API +bool zc_internal_concurrent_close_handle_check(const struct zc_owned_concurrent_close_handle_t *this_); +#endif +/** + * @brief Constructs concurrent close handle in its gravestone state. + */ +#if defined(Z_FEATURE_UNSTABLE_API) +ZENOHC_API +void zc_internal_concurrent_close_handle_null(struct zc_owned_concurrent_close_handle_t *this_); +#endif /** * @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release. * @brief Checks the matching listener is for the gravestone state diff --git a/include/zenoh_macros.h b/include/zenoh_macros.h index f3472437e..62b5462a0 100644 --- a/include/zenoh_macros.h +++ b/include/zenoh_macros.h @@ -48,6 +48,7 @@ static inline z_moved_subscriber_t* z_subscriber_move(z_owned_subscriber_t* x) { static inline z_moved_task_t* z_task_move(z_owned_task_t* x) { return (z_moved_task_t*)(x); } static inline zc_moved_closure_log_t* zc_closure_log_move(zc_owned_closure_log_t* x) { return (zc_moved_closure_log_t*)(x); } static inline zc_moved_closure_matching_status_t* zc_closure_matching_status_move(zc_owned_closure_matching_status_t* x) { return (zc_moved_closure_matching_status_t*)(x); } +static inline zc_moved_concurrent_close_handle_t* zc_concurrent_close_handle_move(zc_owned_concurrent_close_handle_t* x) { return (zc_moved_concurrent_close_handle_t*)(x); } static inline zc_moved_matching_listener_t* zc_matching_listener_move(zc_owned_matching_listener_t* x) { return (zc_moved_matching_listener_t*)(x); } static inline zc_moved_shm_client_list_t* zc_shm_client_list_move(zc_owned_shm_client_list_t* x) { return (zc_moved_shm_client_list_t*)(x); } static inline ze_moved_publication_cache_t* ze_publication_cache_move(ze_owned_publication_cache_t* x) { return (ze_moved_publication_cache_t*)(x); } @@ -174,6 +175,7 @@ static inline ze_moved_serializer_t* ze_serializer_move(ze_owned_serializer_t* x z_moved_task_t* : z_task_drop, \ zc_moved_closure_log_t* : zc_closure_log_drop, \ zc_moved_closure_matching_status_t* : zc_closure_matching_status_drop, \ + zc_moved_concurrent_close_handle_t* : zc_concurrent_close_handle_drop, \ zc_moved_matching_listener_t* : zc_matching_listener_drop, \ zc_moved_shm_client_list_t* : zc_shm_client_list_drop, \ ze_moved_publication_cache_t* : ze_publication_cache_drop, \ @@ -227,6 +229,7 @@ static inline ze_moved_serializer_t* ze_serializer_move(ze_owned_serializer_t* x z_owned_task_t : z_task_move, \ zc_owned_closure_log_t : zc_closure_log_move, \ zc_owned_closure_matching_status_t : zc_closure_matching_status_move, \ + zc_owned_concurrent_close_handle_t : zc_concurrent_close_handle_move, \ zc_owned_matching_listener_t : zc_matching_listener_move, \ zc_owned_shm_client_list_t : zc_shm_client_list_move, \ ze_owned_publication_cache_t : ze_publication_cache_move, \ @@ -280,6 +283,7 @@ static inline ze_moved_serializer_t* ze_serializer_move(ze_owned_serializer_t* x z_owned_task_t* : z_internal_task_null, \ zc_owned_closure_log_t* : zc_internal_closure_log_null, \ zc_owned_closure_matching_status_t* : zc_internal_closure_matching_status_null, \ + zc_owned_concurrent_close_handle_t* : zc_internal_concurrent_close_handle_null, \ zc_owned_matching_listener_t* : zc_internal_matching_listener_null, \ zc_owned_shm_client_list_t* : zc_internal_shm_client_list_null, \ ze_owned_publication_cache_t* : ze_internal_publication_cache_null, \ @@ -331,6 +335,7 @@ static inline void z_subscriber_take(z_owned_subscriber_t* this_, z_moved_subscr static inline void z_task_take(z_owned_task_t* this_, z_moved_task_t* x) { *this_ = x->_this; z_internal_task_null(&x->_this); } static inline void zc_closure_log_take(zc_owned_closure_log_t* closure_, zc_moved_closure_log_t* x) { *closure_ = x->_this; zc_internal_closure_log_null(&x->_this); } static inline void zc_closure_matching_status_take(zc_owned_closure_matching_status_t* closure_, zc_moved_closure_matching_status_t* x) { *closure_ = x->_this; zc_internal_closure_matching_status_null(&x->_this); } +static inline void zc_concurrent_close_handle_take(zc_owned_concurrent_close_handle_t* this_, zc_moved_concurrent_close_handle_t* x) { *this_ = x->_this; zc_internal_concurrent_close_handle_null(&x->_this); } static inline void zc_matching_listener_take(zc_owned_matching_listener_t* this_, zc_moved_matching_listener_t* x) { *this_ = x->_this; zc_internal_matching_listener_null(&x->_this); } static inline void zc_shm_client_list_take(zc_owned_shm_client_list_t* this_, zc_moved_shm_client_list_t* x) { *this_ = x->_this; zc_internal_shm_client_list_null(&x->_this); } static inline void ze_publication_cache_take(ze_owned_publication_cache_t* this_, ze_moved_publication_cache_t* x) { *this_ = x->_this; ze_internal_publication_cache_null(&x->_this); } @@ -384,6 +389,7 @@ static inline void ze_serializer_take(ze_owned_serializer_t* this_, ze_moved_ser z_owned_task_t* : z_task_take, \ zc_owned_closure_log_t* : zc_closure_log_take, \ zc_owned_closure_matching_status_t* : zc_closure_matching_status_take, \ + zc_owned_concurrent_close_handle_t* : zc_concurrent_close_handle_take, \ zc_owned_matching_listener_t* : zc_matching_listener_take, \ zc_owned_shm_client_list_t* : zc_shm_client_list_take, \ ze_owned_publication_cache_t* : ze_publication_cache_take, \ @@ -437,6 +443,7 @@ static inline void ze_serializer_take(ze_owned_serializer_t* this_, ze_moved_ser z_owned_task_t : z_internal_task_check, \ zc_owned_closure_log_t : zc_internal_closure_log_check, \ zc_owned_closure_matching_status_t : zc_internal_closure_matching_status_check, \ + zc_owned_concurrent_close_handle_t : zc_internal_concurrent_close_handle_check, \ zc_owned_matching_listener_t : zc_internal_matching_listener_check, \ zc_owned_shm_client_list_t : zc_internal_shm_client_list_check, \ ze_owned_publication_cache_t : ze_internal_publication_cache_check, \ @@ -558,6 +565,7 @@ static inline z_moved_subscriber_t* z_subscriber_move(z_owned_subscriber_t* x) { static inline z_moved_task_t* z_task_move(z_owned_task_t* x) { return reinterpret_cast(x); } static inline zc_moved_closure_log_t* zc_closure_log_move(zc_owned_closure_log_t* x) { return reinterpret_cast(x); } static inline zc_moved_closure_matching_status_t* zc_closure_matching_status_move(zc_owned_closure_matching_status_t* x) { return reinterpret_cast(x); } +static inline zc_moved_concurrent_close_handle_t* zc_concurrent_close_handle_move(zc_owned_concurrent_close_handle_t* x) { return reinterpret_cast(x); } static inline zc_moved_matching_listener_t* zc_matching_listener_move(zc_owned_matching_listener_t* x) { return reinterpret_cast(x); } static inline zc_moved_shm_client_list_t* zc_shm_client_list_move(zc_owned_shm_client_list_t* x) { return reinterpret_cast(x); } static inline ze_moved_publication_cache_t* ze_publication_cache_move(ze_owned_publication_cache_t* x) { return reinterpret_cast(x); } @@ -679,6 +687,7 @@ inline void z_drop(z_moved_subscriber_t* this_) { z_subscriber_drop(this_); }; inline void z_drop(z_moved_task_t* this_) { z_task_drop(this_); }; inline void z_drop(zc_moved_closure_log_t* closure_) { zc_closure_log_drop(closure_); }; inline void z_drop(zc_moved_closure_matching_status_t* closure_) { zc_closure_matching_status_drop(closure_); }; +inline void z_drop(zc_moved_concurrent_close_handle_t* this_) { zc_concurrent_close_handle_drop(this_); }; inline void z_drop(zc_moved_matching_listener_t* this_) { zc_matching_listener_drop(this_); }; inline void z_drop(zc_moved_shm_client_list_t* this_) { zc_shm_client_list_drop(this_); }; inline void z_drop(ze_moved_publication_cache_t* this_) { ze_publication_cache_drop(this_); }; @@ -730,6 +739,7 @@ inline z_moved_subscriber_t* z_move(z_owned_subscriber_t& this_) { return z_subs inline z_moved_task_t* z_move(z_owned_task_t& this_) { return z_task_move(&this_); }; inline zc_moved_closure_log_t* z_move(zc_owned_closure_log_t& closure_) { return zc_closure_log_move(&closure_); }; inline zc_moved_closure_matching_status_t* z_move(zc_owned_closure_matching_status_t& closure_) { return zc_closure_matching_status_move(&closure_); }; +inline zc_moved_concurrent_close_handle_t* z_move(zc_owned_concurrent_close_handle_t& this_) { return zc_concurrent_close_handle_move(&this_); }; inline zc_moved_matching_listener_t* z_move(zc_owned_matching_listener_t& this_) { return zc_matching_listener_move(&this_); }; inline zc_moved_shm_client_list_t* z_move(zc_owned_shm_client_list_t& this_) { return zc_shm_client_list_move(&this_); }; inline ze_moved_publication_cache_t* z_move(ze_owned_publication_cache_t& this_) { return ze_publication_cache_move(&this_); }; @@ -781,6 +791,7 @@ inline void z_internal_null(z_owned_subscriber_t* this_) { z_internal_subscriber inline void z_internal_null(z_owned_task_t* this_) { z_internal_task_null(this_); }; inline void z_internal_null(zc_owned_closure_log_t* this_) { zc_internal_closure_log_null(this_); }; inline void z_internal_null(zc_owned_closure_matching_status_t* this_) { zc_internal_closure_matching_status_null(this_); }; +inline void z_internal_null(zc_owned_concurrent_close_handle_t* this_) { zc_internal_concurrent_close_handle_null(this_); }; inline void z_internal_null(zc_owned_matching_listener_t* this_) { zc_internal_matching_listener_null(this_); }; inline void z_internal_null(zc_owned_shm_client_list_t* this_) { zc_internal_shm_client_list_null(this_); }; inline void z_internal_null(ze_owned_publication_cache_t* this_) { ze_internal_publication_cache_null(this_); }; @@ -831,6 +842,7 @@ static inline void z_subscriber_take(z_owned_subscriber_t* this_, z_moved_subscr static inline void z_task_take(z_owned_task_t* this_, z_moved_task_t* x) { *this_ = x->_this; z_internal_task_null(&x->_this); } static inline void zc_closure_log_take(zc_owned_closure_log_t* closure_, zc_moved_closure_log_t* x) { *closure_ = x->_this; zc_internal_closure_log_null(&x->_this); } static inline void zc_closure_matching_status_take(zc_owned_closure_matching_status_t* closure_, zc_moved_closure_matching_status_t* x) { *closure_ = x->_this; zc_internal_closure_matching_status_null(&x->_this); } +static inline void zc_concurrent_close_handle_take(zc_owned_concurrent_close_handle_t* this_, zc_moved_concurrent_close_handle_t* x) { *this_ = x->_this; zc_internal_concurrent_close_handle_null(&x->_this); } static inline void zc_matching_listener_take(zc_owned_matching_listener_t* this_, zc_moved_matching_listener_t* x) { *this_ = x->_this; zc_internal_matching_listener_null(&x->_this); } static inline void zc_shm_client_list_take(zc_owned_shm_client_list_t* this_, zc_moved_shm_client_list_t* x) { *this_ = x->_this; zc_internal_shm_client_list_null(&x->_this); } static inline void ze_publication_cache_take(ze_owned_publication_cache_t* this_, ze_moved_publication_cache_t* x) { *this_ = x->_this; ze_internal_publication_cache_null(&x->_this); } @@ -971,6 +983,9 @@ inline void z_take(zc_owned_closure_log_t* closure_, zc_moved_closure_log_t* x) inline void z_take(zc_owned_closure_matching_status_t* closure_, zc_moved_closure_matching_status_t* x) { zc_closure_matching_status_take(closure_, x); }; +inline void z_take(zc_owned_concurrent_close_handle_t* this_, zc_moved_concurrent_close_handle_t* x) { + zc_concurrent_close_handle_take(this_, x); +}; inline void z_take(zc_owned_matching_listener_t* this_, zc_moved_matching_listener_t* x) { zc_matching_listener_take(this_, x); }; @@ -1032,6 +1047,7 @@ inline bool z_internal_check(const z_owned_subscriber_t& this_) { return z_inter inline bool z_internal_check(const z_owned_task_t& this_) { return z_internal_task_check(&this_); }; inline bool z_internal_check(const zc_owned_closure_log_t& this_) { return zc_internal_closure_log_check(&this_); }; inline bool z_internal_check(const zc_owned_closure_matching_status_t& this_) { return zc_internal_closure_matching_status_check(&this_); }; +inline bool z_internal_check(const zc_owned_concurrent_close_handle_t& this_) { return zc_internal_concurrent_close_handle_check(&this_); }; inline bool z_internal_check(const zc_owned_matching_listener_t& this_) { return zc_internal_matching_listener_check(&this_); }; inline bool z_internal_check(const zc_owned_shm_client_list_t& this_) { return zc_internal_shm_client_list_check(&this_); }; inline bool z_internal_check(const ze_owned_publication_cache_t& this_) { return ze_internal_publication_cache_check(&this_); }; diff --git a/splitguide.yaml b/splitguide.yaml index 55783677a..8ba0bf0d5 100644 --- a/splitguide.yaml +++ b/splitguide.yaml @@ -119,4 +119,5 @@ zenoh_opaque.h: - z_loaned_fifo_handler_reply_t! - z_owned_ring_handler_reply_t! - z_loaned_ring_handler_reply_t! + - zc_owned_concurrent_close_handle_t!#unstable diff --git a/src/close.rs b/src/close.rs new file mode 100644 index 000000000..de12254ac --- /dev/null +++ b/src/close.rs @@ -0,0 +1,67 @@ +// +// Copyright (c) 2017, 2022 ZettaScale Technology. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh team, +// + +use std::mem::MaybeUninit; + +use zenoh_runtime::ZRuntime; + +#[cfg(feature = "unstable")] +use crate::opaque_types::zc_owned_concurrent_close_handle_t; +use crate::{ + result::{z_result_t, Z_EIO, Z_OK}, + transmute::{RustTypeRef, RustTypeRefUninit, TakeRustType}, + zc_moved_concurrent_close_handle_t, +}; + +#[cfg(feature = "unstable")] +decl_c_type!( + owned(zc_owned_concurrent_close_handle_t, option tokio::task::JoinHandle>), +); + +/// @brief Blocking wait on close handle to complete. Returns `Z_EIO` if close finishes with error. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn zc_concurrent_close_handle_wait( + handle: &mut zc_moved_concurrent_close_handle_t, +) -> z_result_t { + match ZRuntime::Application.block_on(handle.take_rust_type().unwrap_unchecked()) { + Ok(_) => Z_OK, + Err(e) => { + tracing::error!("Close error: {}", e); + Z_EIO + } + } +} + +/// @brief Drops the close handle. The concurrent close task will not be interrupted. +#[no_mangle] +pub extern "C" fn zc_concurrent_close_handle_drop(this_: &mut zc_moved_concurrent_close_handle_t) { + let _ = this_.take_rust_type(); +} + +/// @brief Returns ``true`` if concurrent close handle is valid, ``false`` if it is in gravestone state. +#[no_mangle] +pub extern "C" fn zc_internal_concurrent_close_handle_check( + this_: &zc_owned_concurrent_close_handle_t, +) -> bool { + this_.as_rust_type_ref().is_some() +} + +/// @brief Constructs concurrent close handle in its gravestone state. +#[no_mangle] +pub extern "C" fn zc_internal_concurrent_close_handle_null( + this_: &mut MaybeUninit, +) { + this_.as_rust_type_mut_uninit().write(None); +} diff --git a/src/lib.rs b/src/lib.rs index 17740aa4a..e4a117bd2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,10 @@ pub mod result; pub use crate::collections::*; mod config; pub use crate::config::*; +#[cfg(feature = "unstable")] +mod close; +#[cfg(feature = "unstable")] +pub use crate::close::*; pub mod encoding; pub use crate::encoding::*; mod commons; diff --git a/src/session.rs b/src/session.rs index eb0421d3f..879adc9b6 100644 --- a/src/session.rs +++ b/src/session.rs @@ -18,6 +18,8 @@ use zenoh::{Session, Wait}; #[cfg(all(feature = "shared-memory", feature = "unstable"))] use crate::z_loaned_shm_client_storage_t; +#[cfg(feature = "unstable")] +use crate::zc_owned_concurrent_close_handle_t; use crate::{ opaque_types::{z_loaned_session_t, z_owned_session_t}, result, @@ -145,13 +147,34 @@ pub extern "C" fn z_internal_session_check(this_: &z_owned_session_t) -> bool { /// Options passed to the `z_close()` function. #[repr(C)] pub struct z_close_options_t { + #[cfg(feature = "unstable")] + #[doc(hidden)] + /// The timeout for close operation in milliseconds. 0 means default close timeout which is 10 seconds. + internal_timeout_ms: u32, + + #[cfg(feature = "unstable")] + #[doc(hidden)] + /// An optional uninitialized concurrent close handle. If set, the close operation will be executed + /// concurrently in separate task, and this handle will be initialized to be used for controlling + /// it's execution. + internal_out_concurrent: Option<&'static mut MaybeUninit>, + + #[cfg(not(feature = "unstable"))] _dummy: u8, } /// Constructs the default value for `z_close_options_t`. #[no_mangle] +#[allow(unused)] pub extern "C" fn z_close_options_default(this_: &mut MaybeUninit) { - this_.write(z_close_options_t { _dummy: 0 }); + this_.write(z_close_options_t { + #[cfg(feature = "unstable")] + internal_timeout_ms: 0, + #[cfg(feature = "unstable")] + internal_out_concurrent: None, + #[cfg(not(feature = "unstable"))] + _dummy: 0, + }); } /// Closes zenoh session. This also drops all the closure callbacks remaining from dropped, but not undeclared subscribers. @@ -160,10 +183,27 @@ pub extern "C" fn z_close_options_default(this_: &mut MaybeUninit, + #[allow(unused)] options: Option<&mut z_close_options_t>, ) -> result::z_result_t { - let s = session.as_rust_type_mut(); - match s.close().wait() { + #[allow(unused_mut)] + let mut close_builder = session.as_rust_type_mut().close(); + + #[cfg(feature = "unstable")] + if let Some(options) = options { + if options.internal_timeout_ms != 0 { + close_builder = close_builder.timeout(core::time::Duration::from_millis( + options.internal_timeout_ms as u64, + )) + } + + if let Some(close_handle) = &mut options.internal_out_concurrent { + let handle = close_builder.in_background().wait(); + close_handle.as_rust_type_mut_uninit().write(Some(handle)); + return result::Z_OK; + } + } + + match close_builder.wait() { Err(e) => { tracing::error!("Error closing session: {}", e); result::Z_EGENERIC diff --git a/tests/z_api_session_test.c b/tests/z_api_session_test.c new file mode 100644 index 000000000..64cacc046 --- /dev/null +++ b/tests/z_api_session_test.c @@ -0,0 +1,91 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +#include +#include + +#include "zenoh.h" + +#undef NDEBUG +#include + +void close_drop() { + z_owned_config_t config; + z_config_default(&config); + + z_owned_session_t s; + if (z_open(&s, z_move(config), NULL) < 0) { + perror("Unable to open session!"); + exit(-1); + } + + z_drop(z_move(s)); +} + +void close_sync() { + z_owned_config_t config; + z_config_default(&config); + + z_owned_session_t s; + if (z_open(&s, z_move(config), NULL) < 0) { + perror("Unable to open session!"); + exit(-1); + } + + z_close_options_t options; + z_close_options_default(&options); + + if (z_close(z_loan_mut(s), &options) < 0) { + perror("Error closing session!"); + exit(-1); + } + + z_drop(z_move(s)); +} + +void close_concurrent() { +#if defined(Z_FEATURE_UNSTABLE_API) + z_owned_config_t config; + z_config_default(&config); + + z_owned_session_t s; + if (z_open(&s, z_move(config), NULL) < 0) { + perror("Unable to open session!"); + exit(-1); + } + + zc_owned_concurrent_close_handle_t close_handle; + + z_close_options_t options; + z_close_options_default(&options); + options.internal_out_concurrent = &close_handle; + + if (z_close(z_loan_mut(s), &options) < 0) { + perror("Error starting concurrent session close!"); + exit(-1); + } + + if (zc_concurrent_close_handle_wait(z_move(close_handle)) < 0) { + perror("Error closing session!"); + exit(-1); + } + + z_drop(z_move(s)); +#endif +} + +int main(int argc, char **argv) { + zc_try_init_log_from_env(); + close_drop(); + close_sync(); + close_concurrent(); +}