From 871b5e83335ecd61702d6f0dd62c08b5d65bcced Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 11 Apr 2024 21:36:45 +0200 Subject: [PATCH 1/6] Rework reply fifo channel --- Cargo.toml | 1 + Cargo.toml.in | 1 + examples/z_get.c | 4 +- examples/z_get_liveliness.c | 2 +- examples/z_non_blocking_get.c | 29 +++-- include/zenoh_commons.h | 88 ++++--------- include/zenoh_macros.h | 12 +- src/closures/mod.rs | 4 +- src/closures/reply_channel.rs | 64 ++++++++++ src/closures/response_channel.rs | 213 ------------------------------- 10 files changed, 110 insertions(+), 308 deletions(-) create mode 100644 src/closures/reply_channel.rs delete mode 100644 src/closures/response_channel.rs diff --git a/Cargo.toml b/Cargo.toml index 2735c3f86..288cf6823 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ libc = "0.2.139" log = "0.4.17" rand = "0.8.5" spin = "0.9.5" +crossbeam-channel = "0.5" # shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice` zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory", "unstable"], default-features = false } zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory"] } diff --git a/Cargo.toml.in b/Cargo.toml.in index f434df3a1..818752a39 100644 --- a/Cargo.toml.in +++ b/Cargo.toml.in @@ -50,6 +50,7 @@ libc = "0.2.139" log = "0.4.17" rand = "0.8.5" spin = "0.9.5" +crossbeam-channel = "0.5" # shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice` zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory", "unstable"], default-features = false } zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory"] } diff --git a/examples/z_get.c b/examples/z_get.c index 272bef89a..a47d23def 100644 --- a/examples/z_get.c +++ b/examples/z_get.c @@ -54,7 +54,7 @@ int main(int argc, char **argv) { } printf("Sending Query '%s'...\n", expr); - z_owned_reply_channel_t channel = zc_reply_fifo_new(16); + z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16); z_get_options_t opts = z_get_options_default(); if (value != NULL) { opts.value.payload = z_bytes_from_str(value); @@ -76,4 +76,4 @@ int main(int argc, char **argv) { z_drop(z_move(channel)); z_close(z_move(s)); return 0; -} \ No newline at end of file +} diff --git a/examples/z_get_liveliness.c b/examples/z_get_liveliness.c index c667cb03a..ba8893ab1 100644 --- a/examples/z_get_liveliness.c +++ b/examples/z_get_liveliness.c @@ -47,7 +47,7 @@ int main(int argc, char **argv) { } printf("Sending liveliness query '%s'...\n", expr); - z_owned_reply_channel_t channel = zc_reply_fifo_new(16); + z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16); zc_liveliness_get(z_loan(s), keyexpr, z_move(channel.send), NULL); z_owned_reply_t reply = z_reply_null(); for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) { diff --git a/examples/z_non_blocking_get.c b/examples/z_non_blocking_get.c index c0b02a274..43df578d5 100644 --- a/examples/z_non_blocking_get.c +++ b/examples/z_non_blocking_get.c @@ -47,26 +47,27 @@ int main(int argc, char **argv) { printf("Sending Query '%s'...\n", expr); z_get_options_t opts = z_get_options_default(); opts.target = Z_QUERY_TARGET_ALL; - z_owned_reply_channel_t channel = zc_reply_non_blocking_fifo_new(16); + z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16); z_get(z_loan(s), keyexpr, "", z_move(channel.send), &opts); // here, the send is moved and will be dropped by zenoh when adequate z_owned_reply_t reply = z_reply_null(); - for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply); - call_success = z_call(channel.recv, &reply)) { - if (!call_success) { - continue; - } - if (z_reply_is_ok(&reply)) { - z_sample_t sample = z_reply_ok(&reply); - z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); - z_drop(z_move(keystr)); - } else { - printf("Received an error\n"); + while (true) { + for (z_call(channel.try_recv, &reply); z_check(reply); z_call(channel.try_recv, &reply)) { + if (z_reply_is_ok(&reply)) { + z_sample_t sample = z_reply_ok(&reply); + z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); + z_drop(z_move(keystr)); + } else { + printf(">> Received an error\n"); + } } + printf(">> Nothing to get... sleep for 5 s\n"); + z_sleep_s(5); } + z_drop(z_move(reply)); z_drop(z_move(channel)); z_close(z_move(s)); return 0; -} \ No newline at end of file +} diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 067170fd4..68eb424b6 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -778,31 +778,14 @@ typedef struct z_query_reply_options_t { struct z_encoding_t encoding; struct z_attachment_t attachment; } z_query_reply_options_t; -/** - * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: - * - `this` is a pointer to an arbitrary state. - * - `call` is the typical callback function. `this` will be passed as its last argument. - * - `drop` allows the callback's state to be freed. - * - * Closures are not guaranteed not to be called concurrently. - * - * We guarantee that: - * - `call` will never be called once `drop` has started. - * - `drop` will only be called ONCE, and AFTER EVERY `call` has ended. - * - The two previous guarantees imply that `call` and `drop` are never called concurrently. - */ -typedef struct z_owned_reply_channel_closure_t { - void *context; - bool (*call)(struct z_owned_reply_t*, void*); - void (*drop)(void*); -} z_owned_reply_channel_closure_t; /** * A pair of closures, the `send` one accepting */ -typedef struct z_owned_reply_channel_t { +typedef struct z_owned_reply_fifo_channel_t { struct z_owned_closure_reply_t send; - struct z_owned_reply_channel_closure_t recv; -} z_owned_reply_channel_t; + struct z_owned_closure_reply_t recv; + struct z_owned_closure_reply_t try_recv; +} z_owned_reply_fifo_channel_t; typedef struct z_owned_scouting_config_t { struct z_owned_config_t _config; unsigned long zc_timeout_ms; @@ -1913,25 +1896,6 @@ ZENOHC_API uint16_t z_random_u16(void); ZENOHC_API uint32_t z_random_u32(void); ZENOHC_API uint64_t z_random_u64(void); ZENOHC_API uint8_t z_random_u8(void); -/** - * Calls the closure. Calling an uninitialized closure is a no-op. - */ -ZENOHC_API -bool z_reply_channel_closure_call(const struct z_owned_reply_channel_closure_t *closure, - struct z_owned_reply_t *sample); -/** - * Drops the closure. Droping an uninitialized closure is a no-op. - */ -ZENOHC_API void z_reply_channel_closure_drop(struct z_owned_reply_channel_closure_t *closure); -/** - * Constructs a null safe-to-drop value of 'z_owned_reply_channel_closure_t' type - */ -ZENOHC_API struct z_owned_reply_channel_closure_t z_reply_channel_closure_null(void); -ZENOHC_API void z_reply_channel_drop(struct z_owned_reply_channel_t *channel); -/** - * Constructs a null safe-to-drop value of 'z_owned_reply_channel_t' type - */ -ZENOHC_API struct z_owned_reply_channel_t z_reply_channel_null(void); /** * Returns ``true`` if `reply_data` is valid. */ @@ -1947,6 +1911,24 @@ ZENOHC_API void z_reply_drop(struct z_owned_reply_t *reply_data); */ ZENOHC_API struct z_value_t z_reply_err(const struct z_owned_reply_t *reply); +ZENOHC_API void z_reply_fifo_channel_drop(struct z_owned_reply_fifo_channel_t *channel); +/** + * Creates a new blocking fifo channel, returned as a pair of closures. + * + * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. + * + * The `send` end should be passed as callback to a `z_get` call. + * + * The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, + * which it will then return; or until the `send` closure is dropped and all replies have been consumed, + * at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. + */ +ZENOHC_API +struct z_owned_reply_fifo_channel_t z_reply_fifo_channel_new(size_t bound); +/** + * Constructs a null safe-to-drop value of 'z_owned_reply_fifo_channel_t' type + */ +ZENOHC_API struct z_owned_reply_fifo_channel_t z_reply_fifo_channel_null(void); /** * Returns ``true`` if the queryable answered with an OK, which allows this value to be treated as a sample. * @@ -2363,32 +2345,6 @@ struct z_owned_query_channel_t zc_query_fifo_new(size_t bound); */ ZENOHC_API struct z_owned_query_channel_t zc_query_non_blocking_fifo_new(size_t bound); -/** - * Creates a new blocking fifo channel, returned as a pair of closures. - * - * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. - * - * The `send` end should be passed as callback to a `z_get` call. - * - * The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, - * which it will then return; or until the `send` closure is dropped and all replies have been consumed, - * at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. - */ -ZENOHC_API -struct z_owned_reply_channel_t zc_reply_fifo_new(size_t bound); -/** - * Creates a new non-blocking fifo channel, returned as a pair of closures. - * - * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. - * - * The `send` end should be passed as callback to a `z_get` call. - * - * The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, - * which it will then return; or until the `send` closure is dropped and all replies have been consumed, - * at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. - */ -ZENOHC_API -struct z_owned_reply_channel_t zc_reply_non_blocking_fifo_new(size_t bound); /** * Clones the sample's payload by incrementing its backing refcount (this doesn't imply any copies). */ diff --git a/include/zenoh_macros.h b/include/zenoh_macros.h index 57a691d7a..3262d35f6 100644 --- a/include/zenoh_macros.h +++ b/include/zenoh_macros.h @@ -35,9 +35,8 @@ z_owned_closure_hello_t * : z_closure_hello_drop, \ z_owned_closure_zid_t * : z_closure_zid_drop, \ zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_drop, \ - z_owned_reply_channel_closure_t * : z_reply_channel_closure_drop, \ z_owned_query_channel_closure_t * : z_query_channel_closure_drop, \ - z_owned_reply_channel_t * : z_reply_channel_drop, \ + z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop, \ z_owned_query_channel_t * : z_query_channel_drop, \ z_owned_bytes_map_t * : z_bytes_map_drop, \ zc_owned_payload_t * : zc_payload_drop, \ @@ -67,8 +66,7 @@ z_owned_closure_hello_t * : z_closure_hello_null, \ z_owned_closure_zid_t * : z_closure_zid_null, \ zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_null, \ - z_owned_reply_channel_closure_t * : z_reply_channel_closure_null, \ - z_owned_reply_channel_t * : z_reply_channel_null, \ + z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_null, \ z_owned_bytes_map_t * : z_bytes_map_null, \ z_attachment_t * : z_attachment_null, \ zc_owned_payload_t * : zc_payload_null, \ @@ -111,7 +109,6 @@ z_owned_closure_hello_t : z_closure_hello_call, \ z_owned_closure_zid_t : z_closure_zid_call, \ zcu_owned_closure_matching_status_t : zcu_closure_matching_status_call, \ - z_owned_reply_channel_closure_t : z_reply_channel_closure_call, \ z_owned_query_channel_closure_t : z_query_channel_closure_call \ ) (&x, __VA_ARGS__) // clang-format on @@ -173,7 +170,6 @@ template<> struct zenoh_drop_type { typedef void type; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; -template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; @@ -201,7 +197,6 @@ template<> inline void z_drop(z_owned_closure_reply_t* v) { z_closure_reply_drop template<> inline void z_drop(z_owned_closure_hello_t* v) { z_closure_hello_drop(v); } template<> inline void z_drop(z_owned_closure_zid_t* v) { z_closure_zid_drop(v); } template<> inline void z_drop(zcu_owned_closure_matching_status_t* v) { zcu_closure_matching_status_drop(v); } -template<> inline void z_drop(z_owned_reply_channel_closure_t* v) { z_reply_channel_closure_drop(v); } template<> inline void z_drop(z_owned_reply_channel_t* v) { z_reply_channel_drop(v); } template<> inline void z_drop(z_owned_bytes_map_t* v) { z_bytes_map_drop(v); } template<> inline void z_drop(zc_owned_liveliness_token_t* v) { zc_liveliness_undeclare_token(v); } @@ -229,7 +224,6 @@ inline void z_null(z_owned_closure_reply_t& v) { v = z_closure_reply_null(); } inline void z_null(z_owned_closure_hello_t& v) { v = z_closure_hello_null(); } inline void z_null(z_owned_closure_zid_t& v) { v = z_closure_zid_null(); } inline void z_null(zcu_owned_closure_matching_status_t& v) { v = zcu_closure_matching_status_null(); } -inline void z_null(z_owned_reply_channel_closure_t& v) { v = z_reply_channel_closure_null(); } inline void z_null(z_owned_reply_channel_t& v) { v = z_reply_channel_null(); } inline void z_null(z_owned_bytes_map_t& v) { v = z_bytes_map_null(); } inline void z_null(zc_owned_liveliness_token_t& v) { v = zc_liveliness_token_null(); } @@ -271,8 +265,6 @@ inline void z_call(const struct z_owned_closure_zid_t &closure, const struct z_i { z_closure_zid_call(&closure, zid); } inline void z_call(const struct zcu_owned_closure_matching_status_t &closure, const struct zcu_matching_status_t *matching_status) { zcu_closure_matching_status_call(&closure, matching_status); } -inline bool z_call(const struct z_owned_reply_channel_closure_t &closure, struct z_owned_reply_t *sample) - { return z_reply_channel_closure_call(&closure, sample); } // clang-format on #define _z_closure_overloader(callback, droper, ctx, ...) \ diff --git a/src/closures/mod.rs b/src/closures/mod.rs index 74b2748aa..93d959fba 100644 --- a/src/closures/mod.rs +++ b/src/closures/mod.rs @@ -23,8 +23,8 @@ mod reply_closure; pub use zenohid_closure::*; mod zenohid_closure; -pub use response_channel::*; -mod response_channel; +pub use reply_channel::*; +mod reply_channel; pub use query_channel::*; mod query_channel; diff --git a/src/closures/reply_channel.rs b/src/closures/reply_channel.rs new file mode 100644 index 000000000..7e656d57c --- /dev/null +++ b/src/closures/reply_channel.rs @@ -0,0 +1,64 @@ +use crate::{z_closure_reply_drop, z_owned_closure_reply_t, z_owned_reply_t}; + +/// A pair of closures, the `send` one accepting +#[repr(C)] +pub struct z_owned_reply_fifo_channel_t { + pub send: z_owned_closure_reply_t, + pub recv: z_owned_closure_reply_t, + pub try_recv: z_owned_closure_reply_t, +} +#[no_mangle] +pub extern "C" fn z_reply_fifo_channel_drop(channel: &mut z_owned_reply_fifo_channel_t) { + z_closure_reply_drop(&mut channel.send); + z_closure_reply_drop(&mut channel.recv); + z_closure_reply_drop(&mut channel.try_recv); +} +/// Constructs a null safe-to-drop value of 'z_owned_reply_fifo_channel_t' type +#[no_mangle] +pub extern "C" fn z_reply_fifo_channel_null() -> z_owned_reply_fifo_channel_t { + z_owned_reply_fifo_channel_t { + send: z_owned_closure_reply_t::empty(), + recv: z_owned_closure_reply_t::empty(), + try_recv: z_owned_closure_reply_t::empty(), + } +} + +/// Creates a new blocking fifo channel, returned as a pair of closures. +/// +/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. +/// +/// The `send` end should be passed as callback to a `z_get` call. +/// +/// The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, +/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, +/// at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. +#[no_mangle] +pub extern "C" fn z_reply_fifo_channel_new(bound: usize) -> z_owned_reply_fifo_channel_t { + let (tx, rx) = if bound == 0 { + crossbeam_channel::unbounded() + } else { + crossbeam_channel::bounded(bound) + }; + let rx_clone = rx.clone(); + z_owned_reply_fifo_channel_t { + send: From::from(move |reply: &mut z_owned_reply_t| { + if let Some(reply) = reply.take() { + if let Err(e) = tx.send(reply) { + log::error!("Attempted to push onto a closed reply_fifo: {}", e) + } + } + }), + recv: From::from(move |receptacle: &mut z_owned_reply_t| { + *receptacle = match rx.recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + try_recv: From::from(move |receptacle: &mut z_owned_reply_t| { + *receptacle = match rx_clone.try_recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + } +} diff --git a/src/closures/response_channel.rs b/src/closures/response_channel.rs deleted file mode 100644 index 0ea046d5d..000000000 --- a/src/closures/response_channel.rs +++ /dev/null @@ -1,213 +0,0 @@ -use crate::{z_closure_reply_drop, z_owned_closure_reply_t, z_owned_reply_t}; -use libc::c_void; -use std::sync::mpsc::TryRecvError; -/// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: -/// - `this` is a pointer to an arbitrary state. -/// - `call` is the typical callback function. `this` will be passed as its last argument. -/// - `drop` allows the callback's state to be freed. -/// -/// Closures are not guaranteed not to be called concurrently. -/// -/// We guarantee that: -/// - `call` will never be called once `drop` has started. -/// - `drop` will only be called ONCE, and AFTER EVERY `call` has ended. -/// - The two previous guarantees imply that `call` and `drop` are never called concurrently. -#[repr(C)] -pub struct z_owned_reply_channel_closure_t { - context: *mut c_void, - call: Option bool>, - drop: Option, -} - -/// A pair of closures, the `send` one accepting -#[repr(C)] -pub struct z_owned_reply_channel_t { - pub send: z_owned_closure_reply_t, - pub recv: z_owned_reply_channel_closure_t, -} -#[no_mangle] -pub extern "C" fn z_reply_channel_drop(channel: &mut z_owned_reply_channel_t) { - z_closure_reply_drop(&mut channel.send); - z_reply_channel_closure_drop(&mut channel.recv); -} -/// Constructs a null safe-to-drop value of 'z_owned_reply_channel_t' type -#[no_mangle] -pub extern "C" fn z_reply_channel_null() -> z_owned_reply_channel_t { - z_owned_reply_channel_t { - send: z_owned_closure_reply_t::empty(), - recv: z_owned_reply_channel_closure_t::empty(), - } -} - -/// Creates a new blocking fifo channel, returned as a pair of closures. -/// -/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. -/// -/// The `send` end should be passed as callback to a `z_get` call. -/// -/// The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, -/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, -/// at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. -#[no_mangle] -pub extern "C" fn zc_reply_fifo_new(bound: usize) -> z_owned_reply_channel_t { - let (send, rx) = if bound == 0 { - let (tx, rx) = std::sync::mpsc::channel(); - ( - From::from(move |reply: &mut z_owned_reply_t| { - if let Some(reply) = reply.take() { - if let Err(e) = tx.send(reply) { - log::error!("Attempted to push onto a closed reply_fifo: {}", e) - } - } - }), - rx, - ) - } else { - let (tx, rx) = std::sync::mpsc::sync_channel(bound); - ( - From::from(move |reply: &mut z_owned_reply_t| { - if let Some(reply) = reply.take() { - if let Err(e) = tx.send(reply) { - log::error!("Attempted to push onto a closed reply_fifo: {}", e) - } - } - }), - rx, - ) - }; - z_owned_reply_channel_t { - send, - recv: From::from(move |receptacle: &mut z_owned_reply_t| { - *receptacle = match rx.recv() { - Ok(val) => val.into(), - Err(_) => None.into(), - }; - true - }), - } -} - -/// Creates a new non-blocking fifo channel, returned as a pair of closures. -/// -/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. -/// -/// The `send` end should be passed as callback to a `z_get` call. -/// -/// The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, -/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, -/// at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. -#[no_mangle] -pub extern "C" fn zc_reply_non_blocking_fifo_new(bound: usize) -> z_owned_reply_channel_t { - let (send, rx) = if bound == 0 { - let (tx, rx) = std::sync::mpsc::channel(); - ( - From::from(move |reply: &mut z_owned_reply_t| { - if let Some(reply) = reply.take() { - if let Err(e) = tx.send(reply) { - log::error!("Attempted to push onto a closed reply_fifo: {}", e) - } - } - }), - rx, - ) - } else { - let (tx, rx) = std::sync::mpsc::sync_channel(bound); - ( - From::from(move |reply: &mut z_owned_reply_t| { - if let Some(reply) = reply.take() { - if let Err(e) = tx.send(reply) { - log::error!("Attempted to push onto a closed reply_fifo: {}", e) - } - } - }), - rx, - ) - }; - - z_owned_reply_channel_t { - send, - recv: From::from( - move |receptacle: &mut z_owned_reply_t| match rx.try_recv() { - Ok(val) => { - let mut tmp = z_owned_reply_t::from(val); - std::mem::swap(&mut tmp, receptacle); - true - } - Err(TryRecvError::Disconnected) => { - receptacle.take(); - true - } - Err(TryRecvError::Empty) => { - receptacle.take(); - false - } - }, - ), - } -} - -impl z_owned_reply_channel_closure_t { - pub fn empty() -> Self { - z_owned_reply_channel_closure_t { - context: std::ptr::null_mut(), - call: None, - drop: None, - } - } -} -unsafe impl Send for z_owned_reply_channel_closure_t {} -unsafe impl Sync for z_owned_reply_channel_closure_t {} -impl Drop for z_owned_reply_channel_closure_t { - fn drop(&mut self) { - if let Some(drop) = self.drop { - drop(self.context) - } - } -} - -/// Constructs a null safe-to-drop value of 'z_owned_reply_channel_closure_t' type -#[no_mangle] -pub extern "C" fn z_reply_channel_closure_null() -> z_owned_reply_channel_closure_t { - z_owned_reply_channel_closure_t::empty() -} - -/// Calls the closure. Calling an uninitialized closure is a no-op. -#[no_mangle] -pub extern "C" fn z_reply_channel_closure_call( - closure: &z_owned_reply_channel_closure_t, - sample: &mut z_owned_reply_t, -) -> bool { - match closure.call { - Some(call) => call(sample, closure.context), - None => { - log::error!("Attempted to call an uninitialized closure!"); - true - } - } -} -/// Drops the closure. Droping an uninitialized closure is a no-op. -#[no_mangle] -pub extern "C" fn z_reply_channel_closure_drop(closure: &mut z_owned_reply_channel_closure_t) { - let mut empty_closure = z_owned_reply_channel_closure_t::empty(); - std::mem::swap(&mut empty_closure, closure); -} -impl bool> From for z_owned_reply_channel_closure_t { - fn from(f: F) -> Self { - let this = Box::into_raw(Box::new(f)) as _; - extern "C" fn call bool>( - response: &mut z_owned_reply_t, - this: *mut c_void, - ) -> bool { - let this = unsafe { &*(this as *const F) }; - this(response) - } - extern "C" fn drop(this: *mut c_void) { - std::mem::drop(unsafe { Box::from_raw(this as *mut F) }) - } - z_owned_reply_channel_closure_t { - context: this, - call: Some(call::), - drop: Some(drop::), - } - } -} From 139384432d4f8875c2208a5595a9dd64e6e819dd Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 12 Apr 2024 13:30:12 +0200 Subject: [PATCH 2/6] Fix tests --- tests/z_api_null_drop_test.c | 9 ++------- tests/z_int_queryable_attachment_test.c | 2 +- tests/z_int_queryable_test.c | 2 +- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/tests/z_api_null_drop_test.c b/tests/z_api_null_drop_test.c index 96eb102ce..69bb1a9f4 100644 --- a/tests/z_api_null_drop_test.c +++ b/tests/z_api_null_drop_test.c @@ -39,8 +39,7 @@ int main(int argc, char **argv) { z_owned_closure_reply_t closure_reply_null_1 = z_closure_reply_null(); z_owned_closure_hello_t closure_hello_null_1 = z_closure_hello_null(); z_owned_closure_zid_t closure_zid_null_1 = z_closure_zid_null(); - z_owned_reply_channel_closure_t reply_channel_closure_null_1 = z_reply_channel_closure_null(); - z_owned_reply_channel_t reply_channel_null_1 = z_reply_channel_null(); + z_owned_reply_fifo_channel_t reply_channel_null_1 = z_reply_fifo_channel_null(); z_owned_str_t str_null_1 = z_str_null(); zc_owned_payload_t payload_null_1 = zc_payload_null(); zc_owned_shmbuf_t shmbuf_null_1 = zc_shmbuf_null(); @@ -82,8 +81,7 @@ int main(int argc, char **argv) { z_owned_closure_reply_t closure_reply_null_2; z_owned_closure_hello_t closure_hello_null_2; z_owned_closure_zid_t closure_zid_null_2; - z_owned_reply_channel_closure_t reply_channel_closure_null_2; - z_owned_reply_channel_t reply_channel_null_2; + z_owned_reply_fifo_channel_t reply_channel_null_2; z_owned_str_t str_null_2; zc_owned_payload_t payload_null_2; zc_owned_shmbuf_t shmbuf_null_2; @@ -104,7 +102,6 @@ int main(int argc, char **argv) { z_null(&closure_reply_null_2); z_null(&closure_hello_null_2); z_null(&closure_zid_null_2); - z_null(&reply_channel_closure_null_2); z_null(&reply_channel_null_2); z_null(&str_null_2); z_null(&payload_null_2); @@ -148,7 +145,6 @@ int main(int argc, char **argv) { z_drop(z_move(closure_reply_null_1)); z_drop(z_move(closure_hello_null_1)); z_drop(z_move(closure_zid_null_1)); - z_drop(z_move(reply_channel_closure_null_1)); z_drop(z_move(reply_channel_null_1)); z_drop(z_move(str_null_1)); z_drop(z_move(payload_null_1)); @@ -170,7 +166,6 @@ int main(int argc, char **argv) { z_drop(z_move(closure_reply_null_2)); z_drop(z_move(closure_hello_null_2)); z_drop(z_move(closure_zid_null_2)); - z_drop(z_move(reply_channel_closure_null_2)); z_drop(z_move(reply_channel_null_2)); z_drop(z_move(str_null_2)); z_drop(z_move(payload_null_2)); diff --git a/tests/z_int_queryable_attachment_test.c b/tests/z_int_queryable_attachment_test.c index ce1ac9f42..b2171e900 100644 --- a/tests/z_int_queryable_attachment_test.c +++ b/tests/z_int_queryable_attachment_test.c @@ -102,7 +102,7 @@ int run_get() { for (int val_num = 0; val_num < values_count; ++val_num) { z_bytes_map_insert_by_copy(&map, z_bytes_from_str(K_VAR), z_bytes_from_str(values[val_num])); - z_owned_reply_channel_t channel = zc_reply_fifo_new(16); + z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16); z_get(z_loan(s), z_keyexpr(keyexpr), "", z_move(channel.send), &opts); z_owned_reply_t reply = z_reply_null(); for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) { diff --git a/tests/z_int_queryable_test.c b/tests/z_int_queryable_test.c index 2999451c4..8a5b7f961 100644 --- a/tests/z_int_queryable_test.c +++ b/tests/z_int_queryable_test.c @@ -77,7 +77,7 @@ int run_get() { } for (int val_num = 0; val_num < values_count; ++val_num) { - z_owned_reply_channel_t channel = zc_reply_fifo_new(16); + z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16); z_get_options_t opts = z_get_options_default(); z_get(z_loan(s), z_keyexpr(keyexpr), "", z_move(channel.send), &opts); z_owned_reply_t reply = z_reply_null(); From ce52f8b336b2b394bea917150e0ab164ae9155c0 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 12 Apr 2024 14:30:07 +0200 Subject: [PATCH 3/6] Add dummy reply ring channel --- include/zenoh_commons.h | 26 ++++++++++++-- include/zenoh_macros.h | 2 ++ src/closures/reply_channel.rs | 65 ++++++++++++++++++++++++++++++++++- 3 files changed, 89 insertions(+), 4 deletions(-) diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 68eb424b6..170140cb8 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -778,14 +778,16 @@ typedef struct z_query_reply_options_t { struct z_encoding_t encoding; struct z_attachment_t attachment; } z_query_reply_options_t; -/** - * A pair of closures, the `send` one accepting - */ typedef struct z_owned_reply_fifo_channel_t { struct z_owned_closure_reply_t send; struct z_owned_closure_reply_t recv; struct z_owned_closure_reply_t try_recv; } z_owned_reply_fifo_channel_t; +typedef struct z_owned_reply_ring_channel_t { + struct z_owned_closure_reply_t send; + struct z_owned_closure_reply_t recv; + struct z_owned_closure_reply_t try_recv; +} z_owned_reply_ring_channel_t; typedef struct z_owned_scouting_config_t { struct z_owned_config_t _config; unsigned long zc_timeout_ms; @@ -1953,6 +1955,24 @@ ZENOHC_API struct z_owned_reply_t z_reply_null(void); */ ZENOHC_API struct z_sample_t z_reply_ok(const struct z_owned_reply_t *reply); +ZENOHC_API void z_reply_ring_channel_drop(struct z_owned_reply_ring_channel_t *channel); +/** + * Creates a new blocking ring channel, returned as a pair of closures. + * + * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. + * + * The `send` end should be passed as callback to a `z_get` call. + * + * The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, + * which it will then return; or until the `send` closure is dropped and all replies have been consumed, + * at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. + */ +ZENOHC_API +struct z_owned_reply_ring_channel_t z_reply_ring_channel_new(size_t bound); +/** + * Constructs a null safe-to-drop value of 'z_owned_reply_ring_channel_t' type + */ +ZENOHC_API struct z_owned_reply_ring_channel_t z_reply_ring_channel_null(void); /** * Scout for routers and/or peers. * diff --git a/include/zenoh_macros.h b/include/zenoh_macros.h index 3262d35f6..9e1c65462 100644 --- a/include/zenoh_macros.h +++ b/include/zenoh_macros.h @@ -37,6 +37,7 @@ zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_drop, \ z_owned_query_channel_closure_t * : z_query_channel_closure_drop, \ z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop, \ + z_owned_reply_ring_channel_t * : z_reply_ring_channel_drop, \ z_owned_query_channel_t * : z_query_channel_drop, \ z_owned_bytes_map_t * : z_bytes_map_drop, \ zc_owned_payload_t * : zc_payload_drop, \ @@ -67,6 +68,7 @@ z_owned_closure_zid_t * : z_closure_zid_null, \ zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_null, \ z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_null, \ + z_owned_reply_ring_channel_t * : z_reply_ring_channel_null, \ z_owned_bytes_map_t * : z_bytes_map_null, \ z_attachment_t * : z_attachment_null, \ zc_owned_payload_t * : zc_payload_null, \ diff --git a/src/closures/reply_channel.rs b/src/closures/reply_channel.rs index 7e656d57c..6e88e4fd7 100644 --- a/src/closures/reply_channel.rs +++ b/src/closures/reply_channel.rs @@ -1,6 +1,5 @@ use crate::{z_closure_reply_drop, z_owned_closure_reply_t, z_owned_reply_t}; -/// A pair of closures, the `send` one accepting #[repr(C)] pub struct z_owned_reply_fifo_channel_t { pub send: z_owned_closure_reply_t, @@ -34,6 +33,7 @@ pub extern "C" fn z_reply_fifo_channel_null() -> z_owned_reply_fifo_channel_t { /// at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. #[no_mangle] pub extern "C" fn z_reply_fifo_channel_new(bound: usize) -> z_owned_reply_fifo_channel_t { + // TODO(sashacmc): switch to handlers::FifoChannel let (tx, rx) = if bound == 0 { crossbeam_channel::unbounded() } else { @@ -62,3 +62,66 @@ pub extern "C" fn z_reply_fifo_channel_new(bound: usize) -> z_owned_reply_fifo_c }), } } + +#[repr(C)] +pub struct z_owned_reply_ring_channel_t { + pub send: z_owned_closure_reply_t, + pub recv: z_owned_closure_reply_t, + pub try_recv: z_owned_closure_reply_t, +} +#[no_mangle] +pub extern "C" fn z_reply_ring_channel_drop(channel: &mut z_owned_reply_ring_channel_t) { + z_closure_reply_drop(&mut channel.send); + z_closure_reply_drop(&mut channel.recv); + z_closure_reply_drop(&mut channel.try_recv); +} +/// Constructs a null safe-to-drop value of 'z_owned_reply_ring_channel_t' type +#[no_mangle] +pub extern "C" fn z_reply_ring_channel_null() -> z_owned_reply_ring_channel_t { + z_owned_reply_ring_channel_t { + send: z_owned_closure_reply_t::empty(), + recv: z_owned_closure_reply_t::empty(), + try_recv: z_owned_closure_reply_t::empty(), + } +} + +/// Creates a new blocking ring channel, returned as a pair of closures. +/// +/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. +/// +/// The `send` end should be passed as callback to a `z_get` call. +/// +/// The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available, +/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, +/// at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. +#[no_mangle] +pub extern "C" fn z_reply_ring_channel_new(bound: usize) -> z_owned_reply_ring_channel_t { + // TODO(sashacmc): switch to handlers::RingChannel + let (tx, rx) = if bound == 0 { + crossbeam_channel::unbounded() + } else { + crossbeam_channel::bounded(bound) + }; + let rx_clone = rx.clone(); + z_owned_reply_ring_channel_t { + send: From::from(move |reply: &mut z_owned_reply_t| { + if let Some(reply) = reply.take() { + if let Err(e) = tx.send(reply) { + log::error!("Attempted to push onto a closed reply_ring: {}", e) + } + } + }), + recv: From::from(move |receptacle: &mut z_owned_reply_t| { + *receptacle = match rx.recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + try_recv: From::from(move |receptacle: &mut z_owned_reply_t| { + *receptacle = match rx_clone.try_recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + } +} From 467b791232de39c66efd9d980ad4eca09c90101b Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 12 Apr 2024 15:31:51 +0200 Subject: [PATCH 4/6] Rework query channels --- examples/z_queryable_with_channels.c | 12 +- include/zenoh_commons.h | 116 +++++------- include/zenoh_macros.h | 9 +- src/closures/query_channel.rs | 261 +++++++++------------------ 4 files changed, 144 insertions(+), 254 deletions(-) diff --git a/examples/z_queryable_with_channels.c b/examples/z_queryable_with_channels.c index 672ddd607..ad201915b 100644 --- a/examples/z_queryable_with_channels.c +++ b/examples/z_queryable_with_channels.c @@ -14,18 +14,13 @@ #include #include #include + #include "zenoh.h" const char *expr = "demo/example/zenoh-c-queryable"; const char *value = "Queryable from C!"; z_keyexpr_t keyexpr; -void query_handler(const z_query_t *query, void *context) { - z_owned_closure_owned_query_t *channel = (z_owned_closure_owned_query_t *)context; - z_owned_query_t oquery = z_query_clone(query); - z_call(*channel, &oquery); -} - int main(int argc, char **argv) { if (argc > 1) { expr = argv[1]; @@ -54,9 +49,8 @@ int main(int argc, char **argv) { } printf("Declaring Queryable on '%s'...\n", expr); - z_owned_query_channel_t channel = zc_query_fifo_new(16); - z_owned_closure_query_t callback = z_closure(query_handler, NULL, &channel.send); - z_owned_queryable_t qable = z_declare_queryable(z_loan(s), keyexpr, z_move(callback), NULL); + z_owned_query_fifo_channel_t channel = z_query_fifo_channel_new(16); + z_owned_queryable_t qable = z_declare_queryable(z_loan(s), keyexpr, z_move(channel.send), NULL); if (!z_check(qable)) { printf("Unable to create queryable.\n"); exit(-1); diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 170140cb8..244b8dd46 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -741,31 +741,11 @@ typedef struct z_put_options_t { enum z_priority_t priority; struct z_attachment_t attachment; } z_put_options_t; -/** - * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: - * - `this` is a pointer to an arbitrary state. - * - `call` is the typical callback function. `this` will be passed as its last argument. - * - `drop` allows the callback's state to be freed. - * - * Closures are not guaranteed not to be called concurrently. - * - * We guarantee that: - * - `call` will never be called once `drop` has started. - * - `drop` will only be called ONCE, and AFTER EVERY `call` has ended. - * - The two previous guarantees imply that `call` and `drop` are never called concurrently. - */ -typedef struct z_owned_query_channel_closure_t { - void *context; - bool (*call)(struct z_owned_query_t*, void*); - void (*drop)(void*); -} z_owned_query_channel_closure_t; -/** - * A pair of closures - */ -typedef struct z_owned_query_channel_t { - struct z_owned_closure_owned_query_t send; - struct z_owned_query_channel_closure_t recv; -} z_owned_query_channel_t; +typedef struct z_owned_query_fifo_channel_t { + struct z_owned_closure_query_t send; + struct z_owned_closure_owned_query_t recv; + struct z_owned_closure_owned_query_t try_recv; +} z_owned_query_fifo_channel_t; /** * Represents the set of options that can be applied to a query reply, * sent via :c:func:`z_query_reply`. @@ -778,6 +758,11 @@ typedef struct z_query_reply_options_t { struct z_encoding_t encoding; struct z_attachment_t attachment; } z_query_reply_options_t; +typedef struct z_owned_query_ring_channel_t { + struct z_owned_closure_query_t send; + struct z_owned_closure_owned_query_t recv; + struct z_owned_closure_owned_query_t try_recv; +} z_owned_query_ring_channel_t; typedef struct z_owned_reply_fifo_channel_t { struct z_owned_closure_reply_t send; struct z_owned_closure_reply_t recv; @@ -1758,25 +1743,6 @@ ZENOHC_API enum z_priority_t z_qos_get_priority(struct z_qos_t qos); * `z_check(return_value) == false` if there was no attachment to the query. */ ZENOHC_API struct z_attachment_t z_query_attachment(const struct z_query_t *query); -/** - * Calls the closure. Calling an uninitialized closure is a no-op. - */ -ZENOHC_API -bool z_query_channel_closure_call(const struct z_owned_query_channel_closure_t *closure, - struct z_owned_query_t *sample); -/** - * Drops the closure. Droping an uninitialized closure is a no-op. - */ -ZENOHC_API void z_query_channel_closure_drop(struct z_owned_query_channel_closure_t *closure); -/** - * Constructs a null safe-to-drop value of 'z_owned_query_channel_closure_t' type - */ -ZENOHC_API struct z_owned_query_channel_closure_t z_query_channel_closure_null(void); -ZENOHC_API void z_query_channel_drop(struct z_owned_query_channel_t *channel); -/** - * Constructs a null safe-to-drop value of 'z_owned_query_channel_t' type - */ -ZENOHC_API struct z_owned_query_channel_t z_query_channel_null(void); /** * Returns `false` if `this` is in a gravestone state, `true` otherwise. * @@ -1825,6 +1791,24 @@ ZENOHC_API struct z_query_consolidation_t z_query_consolidation_none(void); */ ZENOHC_API void z_query_drop(struct z_owned_query_t *this_); +ZENOHC_API void z_query_fifo_channel_drop(struct z_owned_query_fifo_channel_t *channel); +/** + * Creates a new blocking fifo channel, returned as a pair of closures. + * + * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. + * + * The `send` end should be passed as callback to a `z_get` call. + * + * The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available, + * which it will then return; or until the `send` closure is dropped and all replies have been consumed, + * at which point it will return an invalidated `z_owned_query_t`, and so will further calls. + */ +ZENOHC_API +struct z_owned_query_fifo_channel_t z_query_fifo_channel_new(size_t bound); +/** + * Constructs a null safe-to-drop value of 'z_owned_query_fifo_channel_t' type + */ +ZENOHC_API struct z_owned_query_fifo_channel_t z_query_fifo_channel_null(void); /** * Get a query's key by aliasing it. */ @@ -1870,6 +1854,24 @@ int8_t z_query_reply(const struct z_query_t *query, * Constructs the default value for :c:type:`z_query_reply_options_t`. */ ZENOHC_API struct z_query_reply_options_t z_query_reply_options_default(void); +ZENOHC_API void z_query_ring_channel_drop(struct z_owned_query_ring_channel_t *channel); +/** + * Creates a new blocking ring channel, returned as a pair of closures. + * + * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. + * + * The `send` end should be passed as callback to a `z_get` call. + * + * The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available, + * which it will then return; or until the `send` closure is dropped and all replies have been consumed, + * at which point it will return an invalidated `z_owned_query_t`, and so will further calls. + */ +ZENOHC_API +struct z_owned_query_ring_channel_t z_query_ring_channel_new(size_t bound); +/** + * Constructs a null safe-to-drop value of 'z_owned_query_ring_channel_t' type + */ +ZENOHC_API struct z_owned_query_ring_channel_t z_query_ring_channel_null(void); /** * Create a default :c:type:`z_query_target_t`. */ @@ -2339,32 +2341,6 @@ int8_t zc_put_owned(struct z_session_t session, struct z_keyexpr_t keyexpr, struct zc_owned_payload_t *payload, const struct z_put_options_t *opts); -/** - * Creates a new blocking fifo channel, returned as a pair of closures. - * - * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. - * - * The `send` end should be passed as callback to a `z_get` call. - * - * The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available, - * which it will then return; or until the `send` closure is dropped and all queries have been consumed, - * at which point it will return an invalidated `z_owned_query_t`, and so will further calls. - */ -ZENOHC_API -struct z_owned_query_channel_t zc_query_fifo_new(size_t bound); -/** - * Creates a new non-blocking fifo channel, returned as a pair of closures. - * - * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. - * - * The `send` end should be passed as callback to a `z_get` call. - * - * The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available, - * which it will then return; or until the `send` closure is dropped and all queries have been consumed, - * at which point it will return an invalidated `z_owned_query_t`, and so will further calls. - */ -ZENOHC_API -struct z_owned_query_channel_t zc_query_non_blocking_fifo_new(size_t bound); /** * Clones the sample's payload by incrementing its backing refcount (this doesn't imply any copies). */ diff --git a/include/zenoh_macros.h b/include/zenoh_macros.h index 9e1c65462..0f0dbd176 100644 --- a/include/zenoh_macros.h +++ b/include/zenoh_macros.h @@ -35,10 +35,10 @@ z_owned_closure_hello_t * : z_closure_hello_drop, \ z_owned_closure_zid_t * : z_closure_zid_drop, \ zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_drop, \ - z_owned_query_channel_closure_t * : z_query_channel_closure_drop, \ z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop, \ z_owned_reply_ring_channel_t * : z_reply_ring_channel_drop, \ - z_owned_query_channel_t * : z_query_channel_drop, \ + z_owned_query_fifo_channel_t * : z_query_fifo_channel_drop, \ + z_owned_query_ring_channel_t * : z_query_ring_channel_drop, \ z_owned_bytes_map_t * : z_bytes_map_drop, \ zc_owned_payload_t * : zc_payload_drop, \ zc_owned_shmbuf_t * : zc_shmbuf_drop, \ @@ -69,6 +69,8 @@ zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_null, \ z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_null, \ z_owned_reply_ring_channel_t * : z_reply_ring_channel_null, \ + z_owned_query_fifo_channel_t * : z_query_fifo_channel_null, \ + z_owned_query_ring_channel_t * : z_query_ring_channel_null, \ z_owned_bytes_map_t * : z_bytes_map_null, \ z_attachment_t * : z_attachment_null, \ zc_owned_payload_t * : zc_payload_null, \ @@ -110,8 +112,7 @@ z_owned_closure_reply_t : z_closure_reply_call, \ z_owned_closure_hello_t : z_closure_hello_call, \ z_owned_closure_zid_t : z_closure_zid_call, \ - zcu_owned_closure_matching_status_t : zcu_closure_matching_status_call, \ - z_owned_query_channel_closure_t : z_query_channel_closure_call \ + zcu_owned_closure_matching_status_t : zcu_closure_matching_status_call \ ) (&x, __VA_ARGS__) // clang-format on diff --git a/src/closures/query_channel.rs b/src/closures/query_channel.rs index b7a51c9c0..837393239 100644 --- a/src/closures/query_channel.rs +++ b/src/closures/query_channel.rs @@ -1,42 +1,27 @@ -use crate::{z_closure_owned_query_drop, z_owned_closure_owned_query_t, z_owned_query_t}; -use libc::c_void; -use std::sync::mpsc::TryRecvError; +use crate::{ + z_closure_owned_query_drop, z_closure_query_drop, z_owned_closure_owned_query_t, + z_owned_closure_query_t, z_owned_query_t, z_query_clone, z_query_t, +}; -/// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: -/// - `this` is a pointer to an arbitrary state. -/// - `call` is the typical callback function. `this` will be passed as its last argument. -/// - `drop` allows the callback's state to be freed. -/// -/// Closures are not guaranteed not to be called concurrently. -/// -/// We guarantee that: -/// - `call` will never be called once `drop` has started. -/// - `drop` will only be called ONCE, and AFTER EVERY `call` has ended. -/// - The two previous guarantees imply that `call` and `drop` are never called concurrently. #[repr(C)] -pub struct z_owned_query_channel_closure_t { - context: *mut c_void, - call: Option bool>, - drop: Option, -} - -/// A pair of closures -#[repr(C)] -pub struct z_owned_query_channel_t { - pub send: z_owned_closure_owned_query_t, - pub recv: z_owned_query_channel_closure_t, +pub struct z_owned_query_fifo_channel_t { + pub send: z_owned_closure_query_t, + pub recv: z_owned_closure_owned_query_t, + pub try_recv: z_owned_closure_owned_query_t, } #[no_mangle] -pub extern "C" fn z_query_channel_drop(channel: &mut z_owned_query_channel_t) { - z_closure_owned_query_drop(&mut channel.send); - z_query_channel_closure_drop(&mut channel.recv); +pub extern "C" fn z_query_fifo_channel_drop(channel: &mut z_owned_query_fifo_channel_t) { + z_closure_query_drop(&mut channel.send); + z_closure_owned_query_drop(&mut channel.recv); + z_closure_owned_query_drop(&mut channel.try_recv); } -/// Constructs a null safe-to-drop value of 'z_owned_query_channel_t' type +/// Constructs a null safe-to-drop value of 'z_owned_query_fifo_channel_t' type #[no_mangle] -pub extern "C" fn z_query_channel_null() -> z_owned_query_channel_t { - z_owned_query_channel_t { - send: z_owned_closure_owned_query_t::empty(), - recv: z_owned_query_channel_closure_t::empty(), +pub extern "C" fn z_query_fifo_channel_null() -> z_owned_query_fifo_channel_t { + z_owned_query_fifo_channel_t { + send: z_owned_closure_query_t::empty(), + recv: z_owned_closure_owned_query_t::empty(), + try_recv: z_owned_closure_owned_query_t::empty(), } } @@ -47,167 +32,101 @@ pub extern "C" fn z_query_channel_null() -> z_owned_query_channel_t { /// The `send` end should be passed as callback to a `z_get` call. /// /// The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available, -/// which it will then return; or until the `send` closure is dropped and all queries have been consumed, +/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, /// at which point it will return an invalidated `z_owned_query_t`, and so will further calls. #[no_mangle] -pub extern "C" fn zc_query_fifo_new(bound: usize) -> z_owned_query_channel_t { - let (send, rx) = if bound == 0 { - let (tx, rx) = std::sync::mpsc::channel(); - ( - From::from(move |query: &mut z_owned_query_t| { - if let Some(query) = query.take() { - if let Err(e) = tx.send(query) { - log::error!("Attempted to push onto a closed query_fifo: {}", e) - } - } - }), - rx, - ) +pub extern "C" fn z_query_fifo_channel_new(bound: usize) -> z_owned_query_fifo_channel_t { + // TODO(sashacmc): switch to handlers::FifoChannel + let (tx, rx) = if bound == 0 { + crossbeam_channel::unbounded() } else { - let (tx, rx) = std::sync::mpsc::sync_channel(bound); - ( - From::from(move |query: &mut z_owned_query_t| { - if let Some(query) = query.take() { - if let Err(e) = tx.send(query) { - log::error!("Attempted to push onto a closed query_fifo: {}", e) - } - } - }), - rx, - ) + crossbeam_channel::bounded(bound) }; - z_owned_query_channel_t { - send, + let rx_clone = rx.clone(); + z_owned_query_fifo_channel_t { + send: From::from(move |query: &z_query_t| { + let mut oquery = z_query_clone(Some(query)); + if let Some(oquery) = oquery.take() { + if let Err(e) = tx.send(oquery) { + log::error!("Attempted to push onto a closed query_ring: {}", e) + } + } + }), recv: From::from(move |receptacle: &mut z_owned_query_t| { *receptacle = match rx.recv() { Ok(val) => val.into(), Err(_) => None.into(), }; - true + }), + try_recv: From::from(move |receptacle: &mut z_owned_query_t| { + *receptacle = match rx_clone.try_recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; }), } } -/// Creates a new non-blocking fifo channel, returned as a pair of closures. +#[repr(C)] +pub struct z_owned_query_ring_channel_t { + pub send: z_owned_closure_query_t, + pub recv: z_owned_closure_owned_query_t, + pub try_recv: z_owned_closure_owned_query_t, +} +#[no_mangle] +pub extern "C" fn z_query_ring_channel_drop(channel: &mut z_owned_query_ring_channel_t) { + z_closure_query_drop(&mut channel.send); + z_closure_owned_query_drop(&mut channel.recv); + z_closure_owned_query_drop(&mut channel.try_recv); +} +/// Constructs a null safe-to-drop value of 'z_owned_query_ring_channel_t' type +#[no_mangle] +pub extern "C" fn z_query_ring_channel_null() -> z_owned_query_ring_channel_t { + z_owned_query_ring_channel_t { + send: z_owned_closure_query_t::empty(), + recv: z_owned_closure_owned_query_t::empty(), + try_recv: z_owned_closure_owned_query_t::empty(), + } +} + +/// Creates a new blocking ring channel, returned as a pair of closures. /// /// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. /// /// The `send` end should be passed as callback to a `z_get` call. /// /// The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available, -/// which it will then return; or until the `send` closure is dropped and all queries have been consumed, +/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, /// at which point it will return an invalidated `z_owned_query_t`, and so will further calls. #[no_mangle] -pub extern "C" fn zc_query_non_blocking_fifo_new(bound: usize) -> z_owned_query_channel_t { - let (send, rx) = if bound == 0 { - let (tx, rx) = std::sync::mpsc::channel(); - ( - From::from(move |query: &mut z_owned_query_t| { - if let Some(query) = query.take() { - if let Err(e) = tx.send(query) { - log::error!("Attempted to push onto a closed query_fifo: {}", e) - } - } - }), - rx, - ) +pub extern "C" fn z_query_ring_channel_new(bound: usize) -> z_owned_query_ring_channel_t { + // TODO(sashacmc): switch to handlers::RingChannel + let (tx, rx) = if bound == 0 { + crossbeam_channel::unbounded() } else { - let (tx, rx) = std::sync::mpsc::sync_channel(bound); - ( - From::from(move |query: &mut z_owned_query_t| { - if let Some(query) = query.take() { - if let Err(e) = tx.send(query) { - log::error!("Attempted to push onto a closed query_fifo: {}", e) - } - } - }), - rx, - ) + crossbeam_channel::bounded(bound) }; - z_owned_query_channel_t { - send, - recv: From::from( - move |receptacle: &mut z_owned_query_t| match rx.try_recv() { - Ok(val) => { - let mut tmp = z_owned_query_t::from(val); - std::mem::swap(&mut tmp, receptacle); - true + let rx_clone = rx.clone(); + z_owned_query_ring_channel_t { + send: From::from(move |query: &z_query_t| { + let mut oquery = z_query_clone(Some(query)); + if let Some(oquery) = oquery.take() { + if let Err(e) = tx.send(oquery) { + log::error!("Attempted to push onto a closed query_ring: {}", e) } - Err(TryRecvError::Disconnected) => { - receptacle.take(); - true - } - Err(TryRecvError::Empty) => { - receptacle.take(); - false - } - }, - ), - } -} - -impl z_owned_query_channel_closure_t { - pub fn empty() -> Self { - z_owned_query_channel_closure_t { - context: std::ptr::null_mut(), - call: None, - drop: None, - } - } -} -unsafe impl Send for z_owned_query_channel_closure_t {} -unsafe impl Sync for z_owned_query_channel_closure_t {} -impl Drop for z_owned_query_channel_closure_t { - fn drop(&mut self) { - if let Some(drop) = self.drop { - drop(self.context) - } - } -} - -/// Constructs a null safe-to-drop value of 'z_owned_query_channel_closure_t' type -#[no_mangle] -pub extern "C" fn z_query_channel_closure_null() -> z_owned_query_channel_closure_t { - z_owned_query_channel_closure_t::empty() -} - -/// Calls the closure. Calling an uninitialized closure is a no-op. -#[no_mangle] -pub extern "C" fn z_query_channel_closure_call( - closure: &z_owned_query_channel_closure_t, - sample: &mut z_owned_query_t, -) -> bool { - match closure.call { - Some(call) => call(sample, closure.context), - None => { - log::error!("Attempted to call an uninitialized closure!"); - true - } - } -} -/// Drops the closure. Droping an uninitialized closure is a no-op. -#[no_mangle] -pub extern "C" fn z_query_channel_closure_drop(closure: &mut z_owned_query_channel_closure_t) { - let mut empty_closure = z_owned_query_channel_closure_t::empty(); - std::mem::swap(&mut empty_closure, closure); -} -impl bool> From for z_owned_query_channel_closure_t { - fn from(f: F) -> Self { - let this = Box::into_raw(Box::new(f)) as _; - extern "C" fn call bool>( - query: &mut z_owned_query_t, - this: *mut c_void, - ) -> bool { - let this = unsafe { &*(this as *const F) }; - this(query) - } - extern "C" fn drop(this: *mut c_void) { - std::mem::drop(unsafe { Box::from_raw(this as *mut F) }) - } - z_owned_query_channel_closure_t { - context: this, - call: Some(call::), - drop: Some(drop::), - } + } + }), + recv: From::from(move |receptacle: &mut z_owned_query_t| { + *receptacle = match rx.recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + try_recv: From::from(move |receptacle: &mut z_owned_query_t| { + *receptacle = match rx_clone.try_recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), } } From e93f4e050048895d8c7accf4b15d1cf58fbc9cd4 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 12 Apr 2024 19:14:56 +0200 Subject: [PATCH 5/6] [skip ci] Add sample channel (require z_owned_sample_t) --- examples/z_pull.c | 44 +++++------ include/zenoh_macros.h | 33 +++++++-- src/closures/mod.rs | 7 +- src/closures/sample_channel.rs | 132 +++++++++++++++++++++++++++++++++ src/closures/sample_closure.rs | 83 ++++++++++++++++++++- 5 files changed, 267 insertions(+), 32 deletions(-) create mode 100644 src/closures/sample_channel.rs diff --git a/examples/z_pull.c b/examples/z_pull.c index 6115b50ac..b17d069a9 100644 --- a/examples/z_pull.c +++ b/examples/z_pull.c @@ -64,30 +64,30 @@ int main(int argc, char **argv) { } printf("Pull functionality not implemented!\n"); - // @TODO: implement z_owned_sample_channel_t and z_sample_channel_ring_new - // printf("Declaring Subscriber on '%s'...\n", keyexpr); - // z_owned_sample_channel_t channel = z_sample_channel_ring_new(size); - // z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL); - // if (!z_check(sub)) { - // printf("Unable to declare subscriber.\n"); - // return -1; - // } + printf("Declaring Subscriber on '%s'...\n", keyexpr); + z_owned_sample_ring_channel_t channel = z_sample_channel_ring_new(size); + z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL); + if (!z_check(sub)) { + printf("Unable to declare subscriber.\n"); + return -1; + } - // printf("Pulling data every %zu ms... Ring size: %zd\n", interval, size); - // z_owned_sample_t sample = z_sample_null(); - // while (true) { - // for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) { - // z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample.keyexpr)); - // printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, - // sample.payload.start); - // z_drop(z_move(keystr)); - // z_drop(z_move(sample)); - // } - // printf(">> [Subscriber] Nothing to pull... sleep for %zu ms\n", interval); - // z_sleep_ms(interval); - // } + printf("Pulling data every %zu ms... Ring size: %zd\n", interval, size); + z_owned_sample_t sample = z_sample_null(); + while (true) { + for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) { + z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample.keyexpr)); + printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, + sample.payload.start); + z_drop(z_move(keystr)); + z_drop(z_move(sample)); + } + printf(">> [Subscriber] Nothing to pull... sleep for %zu ms\n", interval); + z_sleep_ms(interval); + } - // z_undeclare_subscriber(z_move(sub)); + z_undeclare_subscriber(z_move(sub)); + z_drop(z_move(channel)); z_close(z_move(s)); diff --git a/include/zenoh_macros.h b/include/zenoh_macros.h index 0f0dbd176..617e99cb9 100644 --- a/include/zenoh_macros.h +++ b/include/zenoh_macros.h @@ -35,10 +35,12 @@ z_owned_closure_hello_t * : z_closure_hello_drop, \ z_owned_closure_zid_t * : z_closure_zid_drop, \ zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_drop, \ - z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop, \ - z_owned_reply_ring_channel_t * : z_reply_ring_channel_drop, \ + z_owned_sample_fifo_channel_t * : z_sample_fifo_channel_drop, \ + z_owned_sample_ring_channel_t * : z_sample_ring_channel_drop, \ z_owned_query_fifo_channel_t * : z_query_fifo_channel_drop, \ z_owned_query_ring_channel_t * : z_query_ring_channel_drop, \ + z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop, \ + z_owned_reply_ring_channel_t * : z_reply_ring_channel_drop, \ z_owned_bytes_map_t * : z_bytes_map_drop, \ zc_owned_payload_t * : zc_payload_drop, \ zc_owned_shmbuf_t * : zc_shmbuf_drop, \ @@ -67,10 +69,12 @@ z_owned_closure_hello_t * : z_closure_hello_null, \ z_owned_closure_zid_t * : z_closure_zid_null, \ zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_null, \ - z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_null, \ - z_owned_reply_ring_channel_t * : z_reply_ring_channel_null, \ + z_owned_sample_fifo_channel_t * : z_sample_fifo_channel_null, \ + z_owned_sample_ring_channel_t * : z_sample_ring_channel_null, \ z_owned_query_fifo_channel_t * : z_query_fifo_channel_null, \ z_owned_query_ring_channel_t * : z_query_ring_channel_null, \ + z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_null, \ + z_owned_reply_ring_channel_t * : z_reply_ring_channel_null, \ z_owned_bytes_map_t * : z_bytes_map_null, \ z_attachment_t * : z_attachment_null, \ zc_owned_payload_t * : zc_payload_null, \ @@ -173,7 +177,12 @@ template<> struct zenoh_drop_type { typedef void type; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; -template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef int8_t type; }; @@ -200,7 +209,12 @@ template<> inline void z_drop(z_owned_closure_reply_t* v) { z_closure_reply_drop template<> inline void z_drop(z_owned_closure_hello_t* v) { z_closure_hello_drop(v); } template<> inline void z_drop(z_owned_closure_zid_t* v) { z_closure_zid_drop(v); } template<> inline void z_drop(zcu_owned_closure_matching_status_t* v) { zcu_closure_matching_status_drop(v); } -template<> inline void z_drop(z_owned_reply_channel_t* v) { z_reply_channel_drop(v); } +template<> inline void z_drop(z_owned_sample_fifo_channel_t* v) { z_sample_fifo_channel_drop(v); } +template<> inline void z_drop(z_owned_sample_ring_channel_t* v) { z_sample_ring_channel_drop(v); } +template<> inline void z_drop(z_owned_query_fifo_channel_t* v) { z_query_fifo_channel_drop(v); } +template<> inline void z_drop(z_owned_query_ring_channel_t* v) { z_query_ring_channel_drop(v); } +template<> inline void z_drop(z_owned_reply_fifo_channel_t* v) { z_reply_fifo_channel_drop(v); } +template<> inline void z_drop(z_owned_reply_ring_channel_t* v) { z_reply_ring_channel_drop(v); } template<> inline void z_drop(z_owned_bytes_map_t* v) { z_bytes_map_drop(v); } template<> inline void z_drop(zc_owned_liveliness_token_t* v) { zc_liveliness_undeclare_token(v); } template<> inline int8_t z_drop(ze_owned_publication_cache_t* v) { return ze_undeclare_publication_cache(v); } @@ -227,7 +241,12 @@ inline void z_null(z_owned_closure_reply_t& v) { v = z_closure_reply_null(); } inline void z_null(z_owned_closure_hello_t& v) { v = z_closure_hello_null(); } inline void z_null(z_owned_closure_zid_t& v) { v = z_closure_zid_null(); } inline void z_null(zcu_owned_closure_matching_status_t& v) { v = zcu_closure_matching_status_null(); } -inline void z_null(z_owned_reply_channel_t& v) { v = z_reply_channel_null(); } +inline void z_null(z_owned_sample_fifo_channel_t& v) { v = z_sample_fifo_channel_null(); } +inline void z_null(z_owned_sample_ring_channel_t& v) { v = z_sample_ring_channel_null(); } +inline void z_null(z_owned_query_fifo_channel_t& v) { v = z_query_fifo_channel_null(); } +inline void z_null(z_owned_query_ring_channel_t& v) { v = z_query_ring_channel_null(); } +inline void z_null(z_owned_reply_fifo_channel_t& v) { v = z_reply_fifo_channel_null(); } +inline void z_null(z_owned_reply_ring_channel_t& v) { v = z_reply_ring_channel_null(); } inline void z_null(z_owned_bytes_map_t& v) { v = z_bytes_map_null(); } inline void z_null(zc_owned_liveliness_token_t& v) { v = zc_liveliness_token_null(); } inline void z_null(ze_owned_publication_cache_t& v) { v = ze_publication_cache_null(); } diff --git a/src/closures/mod.rs b/src/closures/mod.rs index 93d959fba..6e4524d6c 100644 --- a/src/closures/mod.rs +++ b/src/closures/mod.rs @@ -23,12 +23,15 @@ mod reply_closure; pub use zenohid_closure::*; mod zenohid_closure; -pub use reply_channel::*; -mod reply_channel; +pub use sample_channel::*; +mod sample_channel; pub use query_channel::*; mod query_channel; +pub use reply_channel::*; +mod reply_channel; + pub use hello_closure::*; mod hello_closure; diff --git a/src/closures/sample_channel.rs b/src/closures/sample_channel.rs new file mode 100644 index 000000000..d63cddf30 --- /dev/null +++ b/src/closures/sample_channel.rs @@ -0,0 +1,132 @@ +use crate::{ + z_closure_owned_sample_drop, z_closure_sample_drop, z_owned_closure_owned_sample_t, + z_owned_closure_sample_t, z_owned_sample_t, z_sample_clone, z_sample_t, +}; + +#[repr(C)] +pub struct z_owned_sample_fifo_channel_t { + pub send: z_owned_closure_sample_t, + pub recv: z_owned_closure_owned_sample_t, + pub try_recv: z_owned_closure_owned_sample_t, +} +#[no_mangle] +pub extern "C" fn z_sample_fifo_channel_drop(channel: &mut z_owned_sample_fifo_channel_t) { + z_closure_sample_drop(&mut channel.send); + z_closure_owned_sample_drop(&mut channel.recv); + z_closure_owned_sample_drop(&mut channel.try_recv); +} +/// Constructs a null safe-to-drop value of 'z_owned_sample_fifo_channel_t' type +#[no_mangle] +pub extern "C" fn z_sample_fifo_channel_null() -> z_owned_sample_fifo_channel_t { + z_owned_sample_fifo_channel_t { + send: z_owned_closure_sample_t::empty(), + recv: z_owned_closure_owned_sample_t::empty(), + try_recv: z_owned_closure_owned_sample_t::empty(), + } +} + +/// Creates a new blocking fifo channel, returned as a pair of closures. +/// +/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. +/// +/// The `send` end should be passed as callback to a `z_get` call. +/// +/// The `recv` end is a synchronous closure that will block until either a `z_owned_sample_t` is available, +/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, +/// at which point it will return an invalidated `z_owned_sample_t`, and so will further calls. +#[no_mangle] +pub extern "C" fn z_sample_fifo_channel_new(bound: usize) -> z_owned_sample_fifo_channel_t { + // TODO(sashacmc): switch to handlers::FifoChannel + let (tx, rx) = if bound == 0 { + crossbeam_channel::unbounded() + } else { + crossbeam_channel::bounded(bound) + }; + let rx_clone = rx.clone(); + z_owned_sample_fifo_channel_t { + send: From::from(move |sample: &z_sample_t| { + let mut osample = z_sample_clone(Some(sample)); + if let Some(osample) = osample.take() { + if let Err(e) = tx.send(osample) { + log::error!("Attempted to push onto a closed sample_ring: {}", e) + } + } + }), + recv: From::from(move |receptacle: &mut z_owned_sample_t| { + *receptacle = match rx.recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + try_recv: From::from(move |receptacle: &mut z_owned_sample_t| { + *receptacle = match rx_clone.try_recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + } +} + +#[repr(C)] +pub struct z_owned_sample_ring_channel_t { + pub send: z_owned_closure_sample_t, + pub recv: z_owned_closure_owned_sample_t, + pub try_recv: z_owned_closure_owned_sample_t, +} +#[no_mangle] +pub extern "C" fn z_sample_ring_channel_drop(channel: &mut z_owned_sample_ring_channel_t) { + z_closure_sample_drop(&mut channel.send); + z_closure_owned_sample_drop(&mut channel.recv); + z_closure_owned_sample_drop(&mut channel.try_recv); +} +/// Constructs a null safe-to-drop value of 'z_owned_sample_ring_channel_t' type +#[no_mangle] +pub extern "C" fn z_sample_ring_channel_null() -> z_owned_sample_ring_channel_t { + z_owned_sample_ring_channel_t { + send: z_owned_closure_sample_t::empty(), + recv: z_owned_closure_owned_sample_t::empty(), + try_recv: z_owned_closure_owned_sample_t::empty(), + } +} + +/// Creates a new blocking ring channel, returned as a pair of closures. +/// +/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. +/// +/// The `send` end should be passed as callback to a `z_get` call. +/// +/// The `recv` end is a synchronous closure that will block until either a `z_owned_sample_t` is available, +/// which it will then return; or until the `send` closure is dropped and all replies have been consumed, +/// at which point it will return an invalidated `z_owned_sample_t`, and so will further calls. +#[no_mangle] +pub extern "C" fn z_sample_ring_channel_new(bound: usize) -> z_owned_sample_ring_channel_t { + // TODO(sashacmc): switch to handlers::RingChannel + let (tx, rx) = if bound == 0 { + crossbeam_channel::unbounded() + } else { + crossbeam_channel::bounded(bound) + }; + let rx_clone = rx.clone(); + z_owned_sample_ring_channel_t { + send: From::from(move |sample: &z_sample_t| { + let mut osample = z_sample_clone(Some(sample)); + if let Some(osample) = osample.take() { + if let Err(e) = tx.send(osample) { + log::error!("Attempted to push onto a closed sample_ring: {}", e) + } + } + }), + recv: From::from(move |receptacle: &mut z_owned_sample_t| { + *receptacle = match rx.recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + try_recv: From::from(move |receptacle: &mut z_owned_sample_t| { + *receptacle = match rx_clone.try_recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + }), + } +} diff --git a/src/closures/sample_closure.rs b/src/closures/sample_closure.rs index 6f0e55707..65cb97a49 100644 --- a/src/closures/sample_closure.rs +++ b/src/closures/sample_closure.rs @@ -1,4 +1,4 @@ -use crate::z_sample_t; +use crate::{z_owned_sample_t, z_sample_t}; use libc::c_void; /// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks. /// @@ -77,3 +77,84 @@ impl From for z_owned_closure_sample_t { } } } + +/// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: +/// +/// Members: +/// void *context: a pointer to an arbitrary state. +/// void *call(const struct z_sample_t*, const void *context): the typical callback function. `context` will be passed as its last argument. +/// void *drop(void*): allows the callback's state to be freed. +/// +/// Closures are not guaranteed not to be called concurrently. +/// +/// It is guaranteed that: +/// +/// - `call` will never be called once `drop` has started. +/// - `drop` will only be called **once**, and **after every** `call` has ended. +/// - The two previous guarantees imply that `call` and `drop` are never called concurrently. +#[repr(C)] +pub struct z_owned_closure_owned_sample_t { + context: *mut c_void, + call: Option, + drop: Option, +} +impl z_owned_closure_owned_sample_t { + pub fn empty() -> Self { + z_owned_closure_owned_sample_t { + context: std::ptr::null_mut(), + call: None, + drop: None, + } + } +} +unsafe impl Send for z_owned_closure_owned_sample_t {} +unsafe impl Sync for z_owned_closure_owned_sample_t {} +impl Drop for z_owned_closure_owned_sample_t { + fn drop(&mut self) { + if let Some(drop) = self.drop { + drop(self.context) + } + } +} +/// Constructs a null safe-to-drop value of 'z_owned_closure_sample_t' type +#[no_mangle] +pub extern "C" fn z_closure_owned_sample_null() -> z_owned_closure_owned_sample_t { + z_owned_closure_owned_sample_t::empty() +} +/// Calls the closure. Calling an uninitialized closure is a no-op. +#[no_mangle] +pub extern "C" fn z_closure_owned_sample_call( + closure: &z_owned_closure_owned_sample_t, + sample: &mut z_owned_sample_t, +) { + match closure.call { + Some(call) => call(sample, closure.context), + None => log::error!("Attempted to call an uninitialized closure!"), + } +} +/// Drops the closure. Droping an uninitialized closure is a no-op. +#[no_mangle] +pub extern "C" fn z_closure_owned_sample_drop(closure: &mut z_owned_closure_owned_sample_t) { + let mut empty_closure = z_owned_closure_owned_sample_t::empty(); + std::mem::swap(&mut empty_closure, closure); +} +impl From for z_owned_closure_owned_sample_t { + fn from(f: F) -> Self { + let this = Box::into_raw(Box::new(f)) as _; + extern "C" fn call( + sample: &mut z_owned_sample_t, + this: *mut c_void, + ) { + let this = unsafe { &*(this as *const F) }; + this(sample) + } + extern "C" fn drop(this: *mut c_void) { + std::mem::drop(unsafe { Box::from_raw(this as *mut F) }) + } + z_owned_closure_owned_sample_t { + context: this, + call: Some(call::), + drop: Some(drop::), + } + } +} From aebac0e0dd0cb826f03a6d51abb9fdbcef4a3471 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Mon, 6 May 2024 17:43:04 +0200 Subject: [PATCH 6/6] [skip ci] Old owned sample draft --- include/zenoh_commons.h | 130 ++++++++++++++++++++++++++++++++++++++++ src/commons.rs | 77 ++++++++++++++++++++++++ 2 files changed, 207 insertions(+) diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 244b8dd46..d2d75bb4d 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -315,6 +315,44 @@ typedef struct z_owned_closure_owned_query_t { void (*call)(struct z_owned_query_t*, void *context); void (*drop)(void*); } z_owned_closure_owned_query_t; +/** + * Owned variant of a z_sample_t. + * + * You may construct it by `z_sample_clone`-ing a loaned sample. + * When the last `z_owned_sample_t` corresponding to a sample is destroyed, or the callback that produced the sample cloned to build them returns, + * the sample will receive its termination signal. + */ +#if !defined(TARGET_ARCH_ARM) +typedef struct ALIGN(8) z_owned_sample_t { + uint64_t _0[17]; +} z_owned_sample_t; +#endif +#if defined(TARGET_ARCH_ARM) +typedef struct ALIGN(8) z_owned_sample_t { + uint64_t _0[15]; +} z_owned_sample_t; +#endif +/** + * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: + * + * Members: + * void *context: a pointer to an arbitrary state. + * void *call(const struct z_sample_t*, const void *context): the typical callback function. `context` will be passed as its last argument. + * void *drop(void*): allows the callback's state to be freed. + * + * Closures are not guaranteed not to be called concurrently. + * + * It is guaranteed that: + * + * - `call` will never be called once `drop` has started. + * - `drop` will only be called **once**, and **after every** `call` has ended. + * - The two previous guarantees imply that `call` and `drop` are never called concurrently. + */ +typedef struct z_owned_closure_owned_sample_t { + void *context; + void (*call)(struct z_owned_sample_t*, void *context); + void (*drop)(void*); +} z_owned_closure_owned_sample_t; /** * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: * @@ -773,6 +811,16 @@ typedef struct z_owned_reply_ring_channel_t { struct z_owned_closure_reply_t recv; struct z_owned_closure_reply_t try_recv; } z_owned_reply_ring_channel_t; +typedef struct z_owned_sample_fifo_channel_t { + struct z_owned_closure_sample_t send; + struct z_owned_closure_owned_sample_t recv; + struct z_owned_closure_owned_sample_t try_recv; +} z_owned_sample_fifo_channel_t; +typedef struct z_owned_sample_ring_channel_t { + struct z_owned_closure_sample_t send; + struct z_owned_closure_owned_sample_t recv; + struct z_owned_closure_owned_sample_t try_recv; +} z_owned_sample_ring_channel_t; typedef struct z_owned_scouting_config_t { struct z_owned_config_t _config; unsigned long zc_timeout_ms; @@ -1173,6 +1221,20 @@ ZENOHC_API struct z_owned_closure_owned_query_t z_closure_owned_query_null(void) * Calls the closure. Calling an uninitialized closure is a no-op. */ ZENOHC_API +void z_closure_owned_sample_call(const struct z_owned_closure_owned_sample_t *closure, + struct z_owned_sample_t *sample); +/** + * Drops the closure. Droping an uninitialized closure is a no-op. + */ +ZENOHC_API void z_closure_owned_sample_drop(struct z_owned_closure_owned_sample_t *closure); +/** + * Constructs a null safe-to-drop value of 'z_owned_closure_sample_t' type + */ +ZENOHC_API struct z_owned_closure_owned_sample_t z_closure_owned_sample_null(void); +/** + * Calls the closure. Calling an uninitialized closure is a no-op. + */ +ZENOHC_API void z_closure_query_call(const struct z_owned_closure_query_t *closure, const struct z_query_t *query); /** @@ -1975,6 +2037,74 @@ struct z_owned_reply_ring_channel_t z_reply_ring_channel_new(size_t bound); * Constructs a null safe-to-drop value of 'z_owned_reply_ring_channel_t' type */ ZENOHC_API struct z_owned_reply_ring_channel_t z_reply_ring_channel_null(void); +/** + * Returns `false` if `this` is in a gravestone state, `true` otherwise. + * + * This function may not be called with the null pointer, but can be called with the gravestone value. + */ +ZENOHC_API +bool z_sample_check(const struct z_owned_sample_t *this_); +/** + * Clones the sample, allowing to keep it in an "open" state past the callback's return. + * + * This operation is infallible, but may return a gravestone value if `sample` itself was a gravestone value (which cannot be the case in a callback). + */ +ZENOHC_API +struct z_owned_sample_t z_sample_clone(const struct z_sample_t *sample); +/** + * Destroys the sample, setting `this` to its gravestone value to prevent double-frees. + * + * This function may not be called with the null pointer, but can be called with the gravestone value. + */ +ZENOHC_API +void z_sample_drop(struct z_owned_sample_t *this_); +ZENOHC_API void z_sample_fifo_channel_drop(struct z_owned_sample_fifo_channel_t *channel); +/** + * Creates a new blocking fifo channel, returned as a pair of closures. + * + * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. + * + * The `send` end should be passed as callback to a `z_get` call. + * + * The `recv` end is a synchronous closure that will block until either a `z_owned_sample_t` is available, + * which it will then return; or until the `send` closure is dropped and all replies have been consumed, + * at which point it will return an invalidated `z_owned_sample_t`, and so will further calls. + */ +ZENOHC_API +struct z_owned_sample_fifo_channel_t z_sample_fifo_channel_new(size_t bound); +/** + * Constructs a null safe-to-drop value of 'z_owned_sample_fifo_channel_t' type + */ +ZENOHC_API struct z_owned_sample_fifo_channel_t z_sample_fifo_channel_null(void); +/** + * Aliases the sample. + * + * This function may not be called with the null pointer, but can be called with the gravestone value. + */ +ZENOHC_API +struct z_sample_t z_sample_loan(const struct z_owned_sample_t *this_); +/** + * The gravestone value of `z_owned_sample_t`. + */ +ZENOHC_API struct z_owned_sample_t z_sample_null(void); +ZENOHC_API void z_sample_ring_channel_drop(struct z_owned_sample_ring_channel_t *channel); +/** + * Creates a new blocking ring channel, returned as a pair of closures. + * + * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. + * + * The `send` end should be passed as callback to a `z_get` call. + * + * The `recv` end is a synchronous closure that will block until either a `z_owned_sample_t` is available, + * which it will then return; or until the `send` closure is dropped and all replies have been consumed, + * at which point it will return an invalidated `z_owned_sample_t`, and so will further calls. + */ +ZENOHC_API +struct z_owned_sample_ring_channel_t z_sample_ring_channel_new(size_t bound); +/** + * Constructs a null safe-to-drop value of 'z_owned_sample_ring_channel_t' type + */ +ZENOHC_API struct z_owned_sample_ring_channel_t z_sample_ring_channel_null(void); /** * Scout for routers and/or peers. * diff --git a/src/commons.rs b/src/commons.rs index 8ac21cb11..2969d3913 100644 --- a/src/commons.rs +++ b/src/commons.rs @@ -20,6 +20,7 @@ use crate::z_priority_t; use crate::{impl_guarded_transmute, GuardedTransmute}; use libc::c_void; use libc::{c_char, c_ulong}; +use std::ops::{Deref, DerefMut}; use zenoh::buffers::ZBuf; use zenoh::prelude::SampleKind; use zenoh::prelude::SplitBuffer; @@ -280,6 +281,82 @@ impl<'a> z_sample_t<'a> { } } +/// Owned variant of a z_sample_t. +/// +/// You may construct it by `z_sample_clone`-ing a loaned sample. +/// When the last `z_owned_sample_t` corresponding to a sample is destroyed, or the callback that produced the sample cloned to build them returns, +/// the sample will receive its termination signal. +#[cfg(not(target_arch = "arm"))] +#[repr(C, align(8))] +pub struct z_owned_sample_t([u64; 17]); + +#[cfg(target_arch = "arm")] +#[repr(C, align(8))] +pub struct z_owned_sample_t([u64; 15]); + +impl From>> for z_owned_sample_t { + fn from(value: Option) -> Self { + unsafe { core::mem::transmute(value) } + } +} +impl From> for z_owned_sample_t { + fn from(value: z_sample_t<'_>) -> Self { + Some(value).into() + } +} +/* +impl Deref for z_owned_sample_t { + type Target = Option; + fn deref(&self) -> &Self::Target { + unsafe { core::mem::transmute(self) } + } +} +impl DerefMut for z_owned_sample_t { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { core::mem::transmute(self) } + } +} +*/ +impl Drop for z_owned_sample_t { + fn drop(&mut self) { + let _: Option = self.take(); + } +} + +/// The gravestone value of `z_owned_sample_t`. +#[no_mangle] +pub extern "C" fn z_sample_null() -> z_owned_sample_t { + unsafe { core::mem::transmute(None::) } +} +/// Returns `false` if `this` is in a gravestone state, `true` otherwise. +/// +/// This function may not be called with the null pointer, but can be called with the gravestone value. +#[no_mangle] +pub extern "C" fn z_sample_check(this: &z_owned_sample_t) -> bool { + this.is_some() +} +/// Aliases the sample. +/// +/// This function may not be called with the null pointer, but can be called with the gravestone value. +#[no_mangle] +pub extern "C" fn z_sample_loan(this: &z_owned_sample_t) -> z_sample_t { + this.as_ref().into() +} +/// Destroys the sample, setting `this` to its gravestone value to prevent double-frees. +/// +/// This function may not be called with the null pointer, but can be called with the gravestone value. +#[no_mangle] +pub extern "C" fn z_sample_drop(this: &mut z_owned_sample_t) { + let _: Option = this.take(); +} +/// Clones the sample, allowing to keep it in an "open" state past the callback's return. +/// +/// This operation is infallible, but may return a gravestone value if `sample` itself was a gravestone value (which cannot be the case in a callback). +#[no_mangle] +pub extern "C" fn z_sample_clone(sample: Option<&z_sample_t>) -> z_owned_sample_t { + sample.and_then(|q| q.cloned()).into() +} + /// Clones the sample's payload by incrementing its backing refcount (this doesn't imply any copies). #[no_mangle] pub extern "C" fn zc_sample_payload_rcinc(sample: Option<&z_sample_t>) -> zc_owned_payload_t {