From a2209b729cdab99e9d1e2c7722693bc7907d1d59 Mon Sep 17 00:00:00 2001 From: Pierre Avital Date: Mon, 18 Dec 2023 15:33:46 +0100 Subject: [PATCH] introduce query channels (#222) --- examples/z_queryable_with_channels.c | 95 ++++++++++++ include/zenoh_commons.h | 123 ++++++++++++++-- include/zenoh_macros.h | 6 +- src/closures/mod.rs | 3 + src/closures/query_channel.rs | 213 +++++++++++++++++++++++++++ src/closures/query_closure.rs | 83 ++++++++++- src/closures/response_channel.rs | 111 ++++++-------- src/queryable.rs | 50 +++++-- 8 files changed, 597 insertions(+), 87 deletions(-) create mode 100644 examples/z_queryable_with_channels.c create mode 100644 src/closures/query_channel.rs diff --git a/examples/z_queryable_with_channels.c b/examples/z_queryable_with_channels.c new file mode 100644 index 000000000..80c98d689 --- /dev/null +++ b/examples/z_queryable_with_channels.c @@ -0,0 +1,95 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) +#include +#define sleep(x) Sleep(x * 1000) +#else +#include +#endif +#include "zenoh.h" + +const char *expr = "demo/example/zenoh-c-queryable"; +const char *value = "Queryable from C!"; +z_keyexpr_t keyexpr; + +void query_handler(const z_query_t *query, void *context) { + z_owned_closure_owned_query_t *channel = (z_owned_closure_owned_query_t *)context; + z_owned_query_t oquery = z_query_clone(query); + z_call(*channel, &oquery); +} + +int main(int argc, char **argv) { + if (argc > 1) { + expr = argv[1]; + } + z_owned_config_t config = z_config_default(); + if (argc > 2) { + if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[2]) < 0) { + printf( + "Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " + "JSON-serialized list of strings\n", + argv[2], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY); + exit(-1); + } + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + keyexpr = z_keyexpr(expr); + if (!z_check(keyexpr)) { + printf("%s is not a valid key expression", expr); + exit(-1); + } + + printf("Declaring Queryable on '%s'...\n", expr); + z_owned_query_channel_t channel = zc_query_fifo_new(16); + z_owned_closure_query_t callback = z_closure(query_handler, NULL, &channel.send); + z_owned_queryable_t qable = z_declare_queryable(z_loan(s), keyexpr, z_move(callback), NULL); + if (!z_check(qable)) { + printf("Unable to create queryable.\n"); + exit(-1); + } + + printf("^C to quit...\n"); + z_owned_query_t oquery = z_query_null(); + for (z_call(channel.recv, &oquery); z_check(oquery); z_call(channel.recv, &oquery)) { + z_query_t query = z_loan(oquery); + z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(&query)); + z_bytes_t pred = z_query_parameters(&query); + z_value_t payload_value = z_query_value(&query); + if (payload_value.payload.len > 0) { + printf(">> [Queryable ] Received Query '%s?%.*s' with value '%.*s'\n", z_loan(keystr), (int)pred.len, + pred.start, (int)payload_value.payload.len, payload_value.payload.start); + } else { + printf(">> [Queryable ] Received Query '%s?%.*s'\n", z_loan(keystr), (int)pred.len, pred.start); + } + z_query_reply_options_t options = z_query_reply_options_default(); + options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); + z_query_reply(&query, keyexpr, (const unsigned char *)value, strlen(value), &options); + z_drop(z_move(keystr)); + z_drop(z_move(oquery)); + } + + z_drop(z_move(qable)); + z_drop(z_move(channel)); + z_drop(z_move(s)); + return 0; +} diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index a0d31803d..2d0a80796 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -259,6 +259,40 @@ typedef struct z_owned_closure_hello_t { void (*call)(struct z_owned_hello_t*, void*); void (*drop)(void*); } z_owned_closure_hello_t; +/** + * Owned variant of a Query received by a Queryable. + * + * You may construct it by `z_query_clone`-ing a loaned query. + * When the last `z_owned_query_t` corresponding to a query is destroyed, or the callback that produced the query cloned to build them returns, + * the query will receive its termination signal. + * + * Holding onto an `z_owned_query_t` for too long (10s by default, can be set in `z_get`'s options) will trigger a timeout error + * to be sent to the querier by the infrastructure, and new responses to the outdated query will be silently dropped. + */ +typedef struct z_owned_query_t { + void *_0; +} z_owned_query_t; +/** + * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: + * + * Members: + * void *context: a pointer to an arbitrary state. + * void *call(const struct z_query_t*, const void *context): the typical callback function. `context` will be passed as its last argument. + * void *drop(void*): allows the callback's state to be freed. + * + * Closures are not guaranteed not to be called concurrently. + * + * It is guaranteed that: + * + * - `call` will never be called once `drop` has started. + * - `drop` will only be called **once**, and **after every** `call` has ended. + * - The two previous guarantees imply that `call` and `drop` are never called concurrently. + */ +typedef struct z_owned_closure_owned_query_t { + void *context; + void (*call)(struct z_owned_query_t*, void *context); + void (*drop)(void*); +} z_owned_closure_owned_query_t; /** * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: * @@ -682,18 +716,30 @@ typedef struct z_put_options_t { struct z_attachment_t attachment; } z_put_options_t; /** - * Owned variant of a Query received by a Queryable. + * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: + * - `this` is a pointer to an arbitrary state. + * - `call` is the typical callback function. `this` will be passed as its last argument. + * - `drop` allows the callback's state to be freed. * - * You may construct it by `z_query_clone`-ing a loaned query. - * When the last `z_owned_query_t` corresponding to a query is destroyed, or the callback that produced the query cloned to build them returns, - * the query will receive its termination signal. + * Closures are not guaranteed not to be called concurrently. * - * Holding onto an `z_owned_query_t` for too long (10s by default, can be set in `z_get`'s options) will trigger a timeout error - * to be sent to the querier by the infrastructure, and new responses to the outdated query will be silently dropped. + * We guarantee that: + * - `call` will never be called once `drop` has started. + * - `drop` will only be called ONCE, and AFTER EVERY `call` has ended. + * - The two previous guarantees imply that `call` and `drop` are never called concurrently. */ -typedef struct z_owned_query_t { - void *_0; -} z_owned_query_t; +typedef struct z_owned_query_channel_closure_t { + void *context; + bool (*call)(struct z_owned_query_t*, void*); + void (*drop)(void*); +} z_owned_query_channel_closure_t; +/** + * A pair of closures + */ +typedef struct z_owned_query_channel_t { + struct z_owned_closure_owned_query_t send; + struct z_owned_query_channel_closure_t recv; +} z_owned_query_channel_t; /** * Represents the set of options that can be applied to a query reply, * sent via :c:func:`z_query_reply`. @@ -1020,6 +1066,20 @@ ZENOHC_API struct z_owned_closure_hello_t z_closure_hello_null(void); * Calls the closure. Calling an uninitialized closure is a no-op. */ ZENOHC_API +void z_closure_owned_query_call(const struct z_owned_closure_owned_query_t *closure, + struct z_owned_query_t *query); +/** + * Drops the closure. Droping an uninitialized closure is a no-op. + */ +ZENOHC_API void z_closure_owned_query_drop(struct z_owned_closure_owned_query_t *closure); +/** + * Constructs a null safe-to-drop value of 'z_owned_closure_query_t' type + */ +ZENOHC_API struct z_owned_closure_owned_query_t z_closure_owned_query_null(void); +/** + * Calls the closure. Calling an uninitialized closure is a no-op. + */ +ZENOHC_API void z_closure_query_call(const struct z_owned_closure_query_t *closure, const struct z_query_t *query); /** @@ -1599,6 +1659,25 @@ ZENOHC_API struct z_put_options_t z_put_options_default(void); * `z_check(return_value) == false` if there was no attachment to the query. */ ZENOHC_API struct z_attachment_t z_query_attachment(const struct z_query_t *query); +/** + * Calls the closure. Calling an uninitialized closure is a no-op. + */ +ZENOHC_API +bool z_query_channel_closure_call(const struct z_owned_query_channel_closure_t *closure, + struct z_owned_query_t *sample); +/** + * Drops the closure. Droping an uninitialized closure is a no-op. + */ +ZENOHC_API void z_query_channel_closure_drop(struct z_owned_query_channel_closure_t *closure); +/** + * Constructs a null safe-to-drop value of 'z_owned_query_channel_closure_t' type + */ +ZENOHC_API struct z_owned_query_channel_closure_t z_query_channel_closure_null(void); +ZENOHC_API void z_query_channel_drop(struct z_owned_query_channel_t *channel); +/** + * Constructs a null safe-to-drop value of 'z_owned_query_channel_t' type + */ +ZENOHC_API struct z_owned_query_channel_t z_query_channel_null(void); /** * Returns `false` if `this` is in a gravestone state, `true` otherwise. * @@ -2118,6 +2197,32 @@ int8_t zc_put_owned(struct z_session_t session, struct z_keyexpr_t keyexpr, struct zc_owned_payload_t *payload, const struct z_put_options_t *opts); +/** + * Creates a new blocking fifo channel, returned as a pair of closures. + * + * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. + * + * The `send` end should be passed as callback to a `z_get` call. + * + * The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available, + * which it will then return; or until the `send` closure is dropped and all queries have been consumed, + * at which point it will return an invalidated `z_owned_query_t`, and so will further calls. + */ +ZENOHC_API +struct z_owned_query_channel_t zc_query_fifo_new(size_t bound); +/** + * Creates a new non-blocking fifo channel, returned as a pair of closures. + * + * If `bound` is different from 0, that channel will be bound and apply back-pressure when full. + * + * The `send` end should be passed as callback to a `z_get` call. + * + * The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available, + * which it will then return; or until the `send` closure is dropped and all queries have been consumed, + * at which point it will return an invalidated `z_owned_query_t`, and so will further calls. + */ +ZENOHC_API +struct z_owned_query_channel_t zc_query_non_blocking_fifo_new(size_t bound); /** * Creates a new blocking fifo channel, returned as a pair of closures. * diff --git a/include/zenoh_macros.h b/include/zenoh_macros.h index 106040271..5c93264b8 100644 --- a/include/zenoh_macros.h +++ b/include/zenoh_macros.h @@ -37,7 +37,9 @@ z_owned_closure_hello_t * : z_closure_hello_drop, \ z_owned_closure_zid_t * : z_closure_zid_drop, \ z_owned_reply_channel_closure_t * : z_reply_channel_closure_drop, \ + z_owned_query_channel_closure_t * : z_query_channel_closure_drop, \ z_owned_reply_channel_t * : z_reply_channel_drop, \ + z_owned_query_channel_t * : z_query_channel_drop, \ z_owned_bytes_map_t * : z_bytes_map_drop, \ zc_owned_payload_t * : zc_payload_drop, \ zc_owned_shmbuf_t * : zc_shmbuf_drop, \ @@ -106,10 +108,12 @@ #define z_call(x, ...) \ _Generic((x), z_owned_closure_sample_t : z_closure_sample_call, \ z_owned_closure_query_t : z_closure_query_call, \ + z_owned_closure_owned_query_t : z_closure_owned_query_call, \ z_owned_closure_reply_t : z_closure_reply_call, \ z_owned_closure_hello_t : z_closure_hello_call, \ z_owned_closure_zid_t : z_closure_zid_call, \ - z_owned_reply_channel_closure_t : z_reply_channel_closure_call \ + z_owned_reply_channel_closure_t : z_reply_channel_closure_call,\ + z_owned_query_channel_closure_t : z_query_channel_closure_call \ ) (&x, __VA_ARGS__) // clang-format on diff --git a/src/closures/mod.rs b/src/closures/mod.rs index bac893909..072f8a66e 100644 --- a/src/closures/mod.rs +++ b/src/closures/mod.rs @@ -26,5 +26,8 @@ mod zenohid_closure; pub use response_channel::*; mod response_channel; +pub use query_channel::*; +mod query_channel; + pub use hello_closure::*; mod hello_closure; diff --git a/src/closures/query_channel.rs b/src/closures/query_channel.rs new file mode 100644 index 000000000..b7a51c9c0 --- /dev/null +++ b/src/closures/query_channel.rs @@ -0,0 +1,213 @@ +use crate::{z_closure_owned_query_drop, z_owned_closure_owned_query_t, z_owned_query_t}; +use libc::c_void; +use std::sync::mpsc::TryRecvError; + +/// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: +/// - `this` is a pointer to an arbitrary state. +/// - `call` is the typical callback function. `this` will be passed as its last argument. +/// - `drop` allows the callback's state to be freed. +/// +/// Closures are not guaranteed not to be called concurrently. +/// +/// We guarantee that: +/// - `call` will never be called once `drop` has started. +/// - `drop` will only be called ONCE, and AFTER EVERY `call` has ended. +/// - The two previous guarantees imply that `call` and `drop` are never called concurrently. +#[repr(C)] +pub struct z_owned_query_channel_closure_t { + context: *mut c_void, + call: Option bool>, + drop: Option, +} + +/// A pair of closures +#[repr(C)] +pub struct z_owned_query_channel_t { + pub send: z_owned_closure_owned_query_t, + pub recv: z_owned_query_channel_closure_t, +} +#[no_mangle] +pub extern "C" fn z_query_channel_drop(channel: &mut z_owned_query_channel_t) { + z_closure_owned_query_drop(&mut channel.send); + z_query_channel_closure_drop(&mut channel.recv); +} +/// Constructs a null safe-to-drop value of 'z_owned_query_channel_t' type +#[no_mangle] +pub extern "C" fn z_query_channel_null() -> z_owned_query_channel_t { + z_owned_query_channel_t { + send: z_owned_closure_owned_query_t::empty(), + recv: z_owned_query_channel_closure_t::empty(), + } +} + +/// Creates a new blocking fifo channel, returned as a pair of closures. +/// +/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. +/// +/// The `send` end should be passed as callback to a `z_get` call. +/// +/// The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available, +/// which it will then return; or until the `send` closure is dropped and all queries have been consumed, +/// at which point it will return an invalidated `z_owned_query_t`, and so will further calls. +#[no_mangle] +pub extern "C" fn zc_query_fifo_new(bound: usize) -> z_owned_query_channel_t { + let (send, rx) = if bound == 0 { + let (tx, rx) = std::sync::mpsc::channel(); + ( + From::from(move |query: &mut z_owned_query_t| { + if let Some(query) = query.take() { + if let Err(e) = tx.send(query) { + log::error!("Attempted to push onto a closed query_fifo: {}", e) + } + } + }), + rx, + ) + } else { + let (tx, rx) = std::sync::mpsc::sync_channel(bound); + ( + From::from(move |query: &mut z_owned_query_t| { + if let Some(query) = query.take() { + if let Err(e) = tx.send(query) { + log::error!("Attempted to push onto a closed query_fifo: {}", e) + } + } + }), + rx, + ) + }; + z_owned_query_channel_t { + send, + recv: From::from(move |receptacle: &mut z_owned_query_t| { + *receptacle = match rx.recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + true + }), + } +} + +/// Creates a new non-blocking fifo channel, returned as a pair of closures. +/// +/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full. +/// +/// The `send` end should be passed as callback to a `z_get` call. +/// +/// The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available, +/// which it will then return; or until the `send` closure is dropped and all queries have been consumed, +/// at which point it will return an invalidated `z_owned_query_t`, and so will further calls. +#[no_mangle] +pub extern "C" fn zc_query_non_blocking_fifo_new(bound: usize) -> z_owned_query_channel_t { + let (send, rx) = if bound == 0 { + let (tx, rx) = std::sync::mpsc::channel(); + ( + From::from(move |query: &mut z_owned_query_t| { + if let Some(query) = query.take() { + if let Err(e) = tx.send(query) { + log::error!("Attempted to push onto a closed query_fifo: {}", e) + } + } + }), + rx, + ) + } else { + let (tx, rx) = std::sync::mpsc::sync_channel(bound); + ( + From::from(move |query: &mut z_owned_query_t| { + if let Some(query) = query.take() { + if let Err(e) = tx.send(query) { + log::error!("Attempted to push onto a closed query_fifo: {}", e) + } + } + }), + rx, + ) + }; + z_owned_query_channel_t { + send, + recv: From::from( + move |receptacle: &mut z_owned_query_t| match rx.try_recv() { + Ok(val) => { + let mut tmp = z_owned_query_t::from(val); + std::mem::swap(&mut tmp, receptacle); + true + } + Err(TryRecvError::Disconnected) => { + receptacle.take(); + true + } + Err(TryRecvError::Empty) => { + receptacle.take(); + false + } + }, + ), + } +} + +impl z_owned_query_channel_closure_t { + pub fn empty() -> Self { + z_owned_query_channel_closure_t { + context: std::ptr::null_mut(), + call: None, + drop: None, + } + } +} +unsafe impl Send for z_owned_query_channel_closure_t {} +unsafe impl Sync for z_owned_query_channel_closure_t {} +impl Drop for z_owned_query_channel_closure_t { + fn drop(&mut self) { + if let Some(drop) = self.drop { + drop(self.context) + } + } +} + +/// Constructs a null safe-to-drop value of 'z_owned_query_channel_closure_t' type +#[no_mangle] +pub extern "C" fn z_query_channel_closure_null() -> z_owned_query_channel_closure_t { + z_owned_query_channel_closure_t::empty() +} + +/// Calls the closure. Calling an uninitialized closure is a no-op. +#[no_mangle] +pub extern "C" fn z_query_channel_closure_call( + closure: &z_owned_query_channel_closure_t, + sample: &mut z_owned_query_t, +) -> bool { + match closure.call { + Some(call) => call(sample, closure.context), + None => { + log::error!("Attempted to call an uninitialized closure!"); + true + } + } +} +/// Drops the closure. Droping an uninitialized closure is a no-op. +#[no_mangle] +pub extern "C" fn z_query_channel_closure_drop(closure: &mut z_owned_query_channel_closure_t) { + let mut empty_closure = z_owned_query_channel_closure_t::empty(); + std::mem::swap(&mut empty_closure, closure); +} +impl bool> From for z_owned_query_channel_closure_t { + fn from(f: F) -> Self { + let this = Box::into_raw(Box::new(f)) as _; + extern "C" fn call bool>( + query: &mut z_owned_query_t, + this: *mut c_void, + ) -> bool { + let this = unsafe { &*(this as *const F) }; + this(query) + } + extern "C" fn drop(this: *mut c_void) { + std::mem::drop(unsafe { Box::from_raw(this as *mut F) }) + } + z_owned_query_channel_closure_t { + context: this, + call: Some(call::), + drop: Some(drop::), + } + } +} diff --git a/src/closures/query_closure.rs b/src/closures/query_closure.rs index ba822f8a5..22ad4bc28 100644 --- a/src/closures/query_closure.rs +++ b/src/closures/query_closure.rs @@ -1,4 +1,4 @@ -use crate::z_query_t; +use crate::{z_owned_query_t, z_query_t}; use libc::c_void; /// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: /// @@ -74,3 +74,84 @@ impl From for z_owned_closure_query_t { } } } + +/// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks: +/// +/// Members: +/// void *context: a pointer to an arbitrary state. +/// void *call(const struct z_query_t*, const void *context): the typical callback function. `context` will be passed as its last argument. +/// void *drop(void*): allows the callback's state to be freed. +/// +/// Closures are not guaranteed not to be called concurrently. +/// +/// It is guaranteed that: +/// +/// - `call` will never be called once `drop` has started. +/// - `drop` will only be called **once**, and **after every** `call` has ended. +/// - The two previous guarantees imply that `call` and `drop` are never called concurrently. +#[repr(C)] +pub struct z_owned_closure_owned_query_t { + context: *mut c_void, + call: Option, + drop: Option, +} +impl z_owned_closure_owned_query_t { + pub fn empty() -> Self { + z_owned_closure_owned_query_t { + context: std::ptr::null_mut(), + call: None, + drop: None, + } + } +} +unsafe impl Send for z_owned_closure_owned_query_t {} +unsafe impl Sync for z_owned_closure_owned_query_t {} +impl Drop for z_owned_closure_owned_query_t { + fn drop(&mut self) { + if let Some(drop) = self.drop { + drop(self.context) + } + } +} +/// Constructs a null safe-to-drop value of 'z_owned_closure_query_t' type +#[no_mangle] +pub extern "C" fn z_closure_owned_query_null() -> z_owned_closure_owned_query_t { + z_owned_closure_owned_query_t::empty() +} +/// Calls the closure. Calling an uninitialized closure is a no-op. +#[no_mangle] +pub extern "C" fn z_closure_owned_query_call( + closure: &z_owned_closure_owned_query_t, + query: &mut z_owned_query_t, +) { + match closure.call { + Some(call) => call(query, closure.context), + None => log::error!("Attempted to call an uninitialized closure!"), + } +} +/// Drops the closure. Droping an uninitialized closure is a no-op. +#[no_mangle] +pub extern "C" fn z_closure_owned_query_drop(closure: &mut z_owned_closure_owned_query_t) { + let mut empty_closure = z_owned_closure_owned_query_t::empty(); + std::mem::swap(&mut empty_closure, closure); +} +impl From for z_owned_closure_owned_query_t { + fn from(f: F) -> Self { + let this = Box::into_raw(Box::new(f)) as _; + extern "C" fn call( + sample: &mut z_owned_query_t, + this: *mut c_void, + ) { + let this = unsafe { &*(this as *const F) }; + this(sample) + } + extern "C" fn drop(this: *mut c_void) { + std::mem::drop(unsafe { Box::from_raw(this as *mut F) }) + } + z_owned_closure_owned_query_t { + context: this, + call: Some(call::), + drop: Some(drop::), + } + } +} diff --git a/src/closures/response_channel.rs b/src/closures/response_channel.rs index a9ebbfdb7..0ea046d5d 100644 --- a/src/closures/response_channel.rs +++ b/src/closures/response_channel.rs @@ -50,42 +50,40 @@ pub extern "C" fn z_reply_channel_null() -> z_owned_reply_channel_t { /// at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. #[no_mangle] pub extern "C" fn zc_reply_fifo_new(bound: usize) -> z_owned_reply_channel_t { - if bound == 0 { + let (send, rx) = if bound == 0 { let (tx, rx) = std::sync::mpsc::channel(); - z_owned_reply_channel_t { - send: From::from(move |reply: &mut z_owned_reply_t| { + ( + From::from(move |reply: &mut z_owned_reply_t| { if let Some(reply) = reply.take() { if let Err(e) = tx.send(reply) { log::error!("Attempted to push onto a closed reply_fifo: {}", e) } } }), - recv: From::from(move |receptacle: &mut z_owned_reply_t| { - *receptacle = match rx.recv() { - Ok(val) => val.into(), - Err(_) => None.into(), - }; - true - }), - } + rx, + ) } else { let (tx, rx) = std::sync::mpsc::sync_channel(bound); - z_owned_reply_channel_t { - send: From::from(move |reply: &mut z_owned_reply_t| { + ( + From::from(move |reply: &mut z_owned_reply_t| { if let Some(reply) = reply.take() { if let Err(e) = tx.send(reply) { log::error!("Attempted to push onto a closed reply_fifo: {}", e) } } }), - recv: From::from(move |receptacle: &mut z_owned_reply_t| { - *receptacle = match rx.recv() { - Ok(val) => val.into(), - Err(_) => None.into(), - }; - true - }), - } + rx, + ) + }; + z_owned_reply_channel_t { + send, + recv: From::from(move |receptacle: &mut z_owned_reply_t| { + *receptacle = match rx.recv() { + Ok(val) => val.into(), + Err(_) => None.into(), + }; + true + }), } } @@ -100,62 +98,51 @@ pub extern "C" fn zc_reply_fifo_new(bound: usize) -> z_owned_reply_channel_t { /// at which point it will return an invalidated `z_owned_reply_t`, and so will further calls. #[no_mangle] pub extern "C" fn zc_reply_non_blocking_fifo_new(bound: usize) -> z_owned_reply_channel_t { - if bound == 0 { + let (send, rx) = if bound == 0 { let (tx, rx) = std::sync::mpsc::channel(); - z_owned_reply_channel_t { - send: From::from(move |reply: &mut z_owned_reply_t| { + ( + From::from(move |reply: &mut z_owned_reply_t| { if let Some(reply) = reply.take() { if let Err(e) = tx.send(reply) { log::error!("Attempted to push onto a closed reply_fifo: {}", e) } } }), - recv: From::from( - move |receptacle: &mut z_owned_reply_t| match rx.try_recv() { - Ok(val) => { - let mut tmp = z_owned_reply_t::from(val); - std::mem::swap(&mut tmp, receptacle); - true - } - Err(TryRecvError::Disconnected) => { - receptacle.take(); - true - } - Err(TryRecvError::Empty) => { - receptacle.take(); - false - } - }, - ), - } + rx, + ) } else { let (tx, rx) = std::sync::mpsc::sync_channel(bound); - z_owned_reply_channel_t { - send: From::from(move |reply: &mut z_owned_reply_t| { + ( + From::from(move |reply: &mut z_owned_reply_t| { if let Some(reply) = reply.take() { if let Err(e) = tx.send(reply) { log::error!("Attempted to push onto a closed reply_fifo: {}", e) } } }), - recv: From::from( - move |receptacle: &mut z_owned_reply_t| match rx.try_recv() { - Ok(val) => { - let mut tmp = z_owned_reply_t::from(val); - std::mem::swap(&mut tmp, receptacle); - true - } - Err(TryRecvError::Disconnected) => { - receptacle.take(); - true - } - Err(TryRecvError::Empty) => { - receptacle.take(); - false - } - }, - ), - } + rx, + ) + }; + + z_owned_reply_channel_t { + send, + recv: From::from( + move |receptacle: &mut z_owned_reply_t| match rx.try_recv() { + Ok(val) => { + let mut tmp = z_owned_reply_t::from(val); + std::mem::swap(&mut tmp, receptacle); + true + } + Err(TryRecvError::Disconnected) => { + receptacle.take(); + true + } + Err(TryRecvError::Empty) => { + receptacle.take(); + false + } + }, + ), } } diff --git a/src/queryable.rs b/src/queryable.rs index 20d7bf159..5a7112afc 100644 --- a/src/queryable.rs +++ b/src/queryable.rs @@ -88,6 +88,23 @@ pub extern "C" fn z_queryable_null() -> z_owned_queryable_t { #[allow(non_camel_case_types)] #[repr(C)] pub struct z_query_t(*mut c_void); +impl From<&Query> for z_query_t { + fn from(value: &Query) -> Self { + z_query_t(value as *const _ as *mut _) + } +} +impl From> for z_query_t { + fn from(value: Option<&Query>) -> Self { + value.map_or(Self(core::ptr::null_mut()), Into::into) + } +} +impl Deref for z_query_t { + type Target = Option<&'static Query>; + fn deref(&self) -> &Self::Target { + unsafe { core::mem::transmute(self) } + } +} + /// Owned variant of a Query received by a Queryable. /// /// You may construct it by `z_query_clone`-ing a loaned query. @@ -99,21 +116,31 @@ pub struct z_query_t(*mut c_void); #[allow(non_camel_case_types)] #[repr(C)] pub struct z_owned_query_t(*mut c_void); -impl Deref for z_query_t { - type Target = Option; - fn deref(&self) -> &Self::Target { - unsafe { &*(self.0 as *const _) } + +impl From> for z_owned_query_t { + fn from(value: Option) -> Self { + unsafe { core::mem::transmute(value) } + } +} +impl From for z_owned_query_t { + fn from(value: Query) -> Self { + Some(value).into() } } impl Deref for z_owned_query_t { type Target = Option; fn deref(&self) -> &Self::Target { - unsafe { &*(self.0 as *const _) } + unsafe { core::mem::transmute(self) } } } impl DerefMut for z_owned_query_t { fn deref_mut(&mut self) -> &mut Self::Target { - unsafe { &mut *(self.0 as *mut _) } + unsafe { core::mem::transmute(self) } + } +} +impl Drop for z_owned_query_t { + fn drop(&mut self) { + let _: Option = self.take(); } } /// The gravestone value of `z_owned_query_t`. @@ -133,7 +160,7 @@ pub extern "C" fn z_query_check(this: &z_owned_query_t) -> bool { /// This function may not be called with the null pointer, but can be called with the gravestone value. #[no_mangle] pub extern "C" fn z_query_loan(this: &z_owned_query_t) -> z_query_t { - unsafe { core::mem::transmute_copy(this) } + this.as_ref().into() } /// Destroys the query, setting `this` to its gravestone value to prevent double-frees. /// @@ -147,10 +174,7 @@ pub extern "C" fn z_query_drop(this: &mut z_owned_query_t) { /// This operation is infallible, but may return a gravestone value if `query` itself was a gravestone value (which cannot be the case in a callback). #[no_mangle] pub extern "C" fn z_query_clone(query: Option<&z_query_t>) -> z_owned_query_t { - match query { - Some(query) => unsafe { core::mem::transmute(query.as_ref().cloned()) }, - None => z_query_null(), - } + query.and_then(|q| q.cloned()).into() } /// Options passed to the :c:func:`z_declare_queryable` function. @@ -225,9 +249,7 @@ pub extern "C" fn z_declare_queryable( builder = builder.complete(options.complete); } builder - .callback(move |query| { - z_closure_query_call(&closure, &z_query_t(&query as *const _ as *mut c_void)) - }) + .callback(move |query| z_closure_query_call(&closure, &z_query_t::from(&query))) .res_sync() .map_err(|e| log::error!("{}", e)) .ok()