Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework channels for sample/query/reply #329

Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Rework reply fifo channel
  • Loading branch information
sashacmc committed Apr 11, 2024
commit 871b5e83335ecd61702d6f0dd62c08b5d65bcced
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ libc = "0.2.139"
log = "0.4.17"
rand = "0.8.5"
spin = "0.9.5"
crossbeam-channel = "0.5"
# shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice`
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory", "unstable"], default-features = false }
zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory"] }
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml.in
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ libc = "0.2.139"
log = "0.4.17"
rand = "0.8.5"
spin = "0.9.5"
crossbeam-channel = "0.5"
# shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice`
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory", "unstable"], default-features = false }
zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory"] }
Expand Down
4 changes: 2 additions & 2 deletions examples/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ int main(int argc, char **argv) {
}

printf("Sending Query '%s'...\n", expr);
z_owned_reply_channel_t channel = zc_reply_fifo_new(16);
z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16);
z_get_options_t opts = z_get_options_default();
if (value != NULL) {
opts.value.payload = z_bytes_from_str(value);
Expand All @@ -76,4 +76,4 @@ int main(int argc, char **argv) {
z_drop(z_move(channel));
z_close(z_move(s));
return 0;
}
}
2 changes: 1 addition & 1 deletion examples/z_get_liveliness.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ int main(int argc, char **argv) {
}

printf("Sending liveliness query '%s'...\n", expr);
z_owned_reply_channel_t channel = zc_reply_fifo_new(16);
z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16);
zc_liveliness_get(z_loan(s), keyexpr, z_move(channel.send), NULL);
z_owned_reply_t reply = z_reply_null();
for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) {
Expand Down
29 changes: 15 additions & 14 deletions examples/z_non_blocking_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,27 @@ int main(int argc, char **argv) {
printf("Sending Query '%s'...\n", expr);
z_get_options_t opts = z_get_options_default();
opts.target = Z_QUERY_TARGET_ALL;
z_owned_reply_channel_t channel = zc_reply_non_blocking_fifo_new(16);
z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16);
z_get(z_loan(s), keyexpr, "", z_move(channel.send),
&opts); // here, the send is moved and will be dropped by zenoh when adequate
z_owned_reply_t reply = z_reply_null();
for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply);
call_success = z_call(channel.recv, &reply)) {
if (!call_success) {
continue;
}
if (z_reply_is_ok(&reply)) {
z_sample_t sample = z_reply_ok(&reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
} else {
printf("Received an error\n");
while (true) {
for (z_call(channel.try_recv, &reply); z_check(reply); z_call(channel.try_recv, &reply)) {
if (z_reply_is_ok(&reply)) {
z_sample_t sample = z_reply_ok(&reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
} else {
printf(">> Received an error\n");
}
}
printf(">> Nothing to get... sleep for 5 s\n");
z_sleep_s(5);
}

z_drop(z_move(reply));
z_drop(z_move(channel));
z_close(z_move(s));
return 0;
}
}
88 changes: 22 additions & 66 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -778,31 +778,14 @@ typedef struct z_query_reply_options_t {
struct z_encoding_t encoding;
struct z_attachment_t attachment;
} z_query_reply_options_t;
/**
* A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks:
* - `this` is a pointer to an arbitrary state.
* - `call` is the typical callback function. `this` will be passed as its last argument.
* - `drop` allows the callback's state to be freed.
*
* Closures are not guaranteed not to be called concurrently.
*
* We guarantee that:
* - `call` will never be called once `drop` has started.
* - `drop` will only be called ONCE, and AFTER EVERY `call` has ended.
* - The two previous guarantees imply that `call` and `drop` are never called concurrently.
*/
typedef struct z_owned_reply_channel_closure_t {
void *context;
bool (*call)(struct z_owned_reply_t*, void*);
void (*drop)(void*);
} z_owned_reply_channel_closure_t;
/**
* A pair of closures, the `send` one accepting
*/
typedef struct z_owned_reply_channel_t {
typedef struct z_owned_reply_fifo_channel_t {
struct z_owned_closure_reply_t send;
struct z_owned_reply_channel_closure_t recv;
} z_owned_reply_channel_t;
struct z_owned_closure_reply_t recv;
struct z_owned_closure_reply_t try_recv;
} z_owned_reply_fifo_channel_t;
typedef struct z_owned_scouting_config_t {
struct z_owned_config_t _config;
unsigned long zc_timeout_ms;
Expand Down Expand Up @@ -1913,25 +1896,6 @@ ZENOHC_API uint16_t z_random_u16(void);
ZENOHC_API uint32_t z_random_u32(void);
ZENOHC_API uint64_t z_random_u64(void);
ZENOHC_API uint8_t z_random_u8(void);
/**
* Calls the closure. Calling an uninitialized closure is a no-op.
*/
ZENOHC_API
bool z_reply_channel_closure_call(const struct z_owned_reply_channel_closure_t *closure,
struct z_owned_reply_t *sample);
/**
* Drops the closure. Droping an uninitialized closure is a no-op.
*/
ZENOHC_API void z_reply_channel_closure_drop(struct z_owned_reply_channel_closure_t *closure);
/**
* Constructs a null safe-to-drop value of 'z_owned_reply_channel_closure_t' type
*/
ZENOHC_API struct z_owned_reply_channel_closure_t z_reply_channel_closure_null(void);
ZENOHC_API void z_reply_channel_drop(struct z_owned_reply_channel_t *channel);
/**
* Constructs a null safe-to-drop value of 'z_owned_reply_channel_t' type
*/
ZENOHC_API struct z_owned_reply_channel_t z_reply_channel_null(void);
/**
* Returns ``true`` if `reply_data` is valid.
*/
Expand All @@ -1947,6 +1911,24 @@ ZENOHC_API void z_reply_drop(struct z_owned_reply_t *reply_data);
*/
ZENOHC_API
struct z_value_t z_reply_err(const struct z_owned_reply_t *reply);
ZENOHC_API void z_reply_fifo_channel_drop(struct z_owned_reply_fifo_channel_t *channel);
/**
* Creates a new blocking fifo channel, returned as a pair of closures.
*
* If `bound` is different from 0, that channel will be bound and apply back-pressure when full.
*
* The `send` end should be passed as callback to a `z_get` call.
*
* The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available,
* which it will then return; or until the `send` closure is dropped and all replies have been consumed,
* at which point it will return an invalidated `z_owned_reply_t`, and so will further calls.
*/
ZENOHC_API
struct z_owned_reply_fifo_channel_t z_reply_fifo_channel_new(size_t bound);
/**
* Constructs a null safe-to-drop value of 'z_owned_reply_fifo_channel_t' type
*/
ZENOHC_API struct z_owned_reply_fifo_channel_t z_reply_fifo_channel_null(void);
/**
* Returns ``true`` if the queryable answered with an OK, which allows this value to be treated as a sample.
*
Expand Down Expand Up @@ -2363,32 +2345,6 @@ struct z_owned_query_channel_t zc_query_fifo_new(size_t bound);
*/
ZENOHC_API
struct z_owned_query_channel_t zc_query_non_blocking_fifo_new(size_t bound);
/**
* Creates a new blocking fifo channel, returned as a pair of closures.
*
* If `bound` is different from 0, that channel will be bound and apply back-pressure when full.
*
* The `send` end should be passed as callback to a `z_get` call.
*
* The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available,
* which it will then return; or until the `send` closure is dropped and all replies have been consumed,
* at which point it will return an invalidated `z_owned_reply_t`, and so will further calls.
*/
ZENOHC_API
struct z_owned_reply_channel_t zc_reply_fifo_new(size_t bound);
/**
* Creates a new non-blocking fifo channel, returned as a pair of closures.
*
* If `bound` is different from 0, that channel will be bound and apply back-pressure when full.
*
* The `send` end should be passed as callback to a `z_get` call.
*
* The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available,
* which it will then return; or until the `send` closure is dropped and all replies have been consumed,
* at which point it will return an invalidated `z_owned_reply_t`, and so will further calls.
*/
ZENOHC_API
struct z_owned_reply_channel_t zc_reply_non_blocking_fifo_new(size_t bound);
/**
* Clones the sample's payload by incrementing its backing refcount (this doesn't imply any copies).
*/
Expand Down
12 changes: 2 additions & 10 deletions include/zenoh_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@
z_owned_closure_hello_t * : z_closure_hello_drop, \
z_owned_closure_zid_t * : z_closure_zid_drop, \
zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_drop, \
z_owned_reply_channel_closure_t * : z_reply_channel_closure_drop, \
z_owned_query_channel_closure_t * : z_query_channel_closure_drop, \
z_owned_reply_channel_t * : z_reply_channel_drop, \
z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop, \
z_owned_query_channel_t * : z_query_channel_drop, \
z_owned_bytes_map_t * : z_bytes_map_drop, \
zc_owned_payload_t * : zc_payload_drop, \
Expand Down Expand Up @@ -67,8 +66,7 @@
z_owned_closure_hello_t * : z_closure_hello_null, \
z_owned_closure_zid_t * : z_closure_zid_null, \
zcu_owned_closure_matching_status_t * : zcu_closure_matching_status_null, \
z_owned_reply_channel_closure_t * : z_reply_channel_closure_null, \
z_owned_reply_channel_t * : z_reply_channel_null, \
z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_null, \
z_owned_bytes_map_t * : z_bytes_map_null, \
z_attachment_t * : z_attachment_null, \
zc_owned_payload_t * : zc_payload_null, \
Expand Down Expand Up @@ -111,7 +109,6 @@
z_owned_closure_hello_t : z_closure_hello_call, \
z_owned_closure_zid_t : z_closure_zid_call, \
zcu_owned_closure_matching_status_t : zcu_closure_matching_status_call, \
z_owned_reply_channel_closure_t : z_reply_channel_closure_call, \
z_owned_query_channel_closure_t : z_query_channel_closure_call \
) (&x, __VA_ARGS__)
// clang-format on
Expand Down Expand Up @@ -173,7 +170,6 @@ template<> struct zenoh_drop_type<z_owned_closure_reply_t> { typedef void type;
template<> struct zenoh_drop_type<z_owned_closure_hello_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_closure_zid_t> { typedef void type; };
template<> struct zenoh_drop_type<zcu_owned_closure_matching_status_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_reply_channel_closure_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_reply_channel_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_bytes_map_t> { typedef void type; };
template<> struct zenoh_drop_type<zc_owned_liveliness_token_t> { typedef void type; };
Expand Down Expand Up @@ -201,7 +197,6 @@ template<> inline void z_drop(z_owned_closure_reply_t* v) { z_closure_reply_drop
template<> inline void z_drop(z_owned_closure_hello_t* v) { z_closure_hello_drop(v); }
template<> inline void z_drop(z_owned_closure_zid_t* v) { z_closure_zid_drop(v); }
template<> inline void z_drop(zcu_owned_closure_matching_status_t* v) { zcu_closure_matching_status_drop(v); }
template<> inline void z_drop(z_owned_reply_channel_closure_t* v) { z_reply_channel_closure_drop(v); }
template<> inline void z_drop(z_owned_reply_channel_t* v) { z_reply_channel_drop(v); }
template<> inline void z_drop(z_owned_bytes_map_t* v) { z_bytes_map_drop(v); }
template<> inline void z_drop(zc_owned_liveliness_token_t* v) { zc_liveliness_undeclare_token(v); }
Expand Down Expand Up @@ -229,7 +224,6 @@ inline void z_null(z_owned_closure_reply_t& v) { v = z_closure_reply_null(); }
inline void z_null(z_owned_closure_hello_t& v) { v = z_closure_hello_null(); }
inline void z_null(z_owned_closure_zid_t& v) { v = z_closure_zid_null(); }
inline void z_null(zcu_owned_closure_matching_status_t& v) { v = zcu_closure_matching_status_null(); }
inline void z_null(z_owned_reply_channel_closure_t& v) { v = z_reply_channel_closure_null(); }
inline void z_null(z_owned_reply_channel_t& v) { v = z_reply_channel_null(); }
inline void z_null(z_owned_bytes_map_t& v) { v = z_bytes_map_null(); }
inline void z_null(zc_owned_liveliness_token_t& v) { v = zc_liveliness_token_null(); }
Expand Down Expand Up @@ -271,8 +265,6 @@ inline void z_call(const struct z_owned_closure_zid_t &closure, const struct z_i
{ z_closure_zid_call(&closure, zid); }
inline void z_call(const struct zcu_owned_closure_matching_status_t &closure, const struct zcu_matching_status_t *matching_status)
{ zcu_closure_matching_status_call(&closure, matching_status); }
inline bool z_call(const struct z_owned_reply_channel_closure_t &closure, struct z_owned_reply_t *sample)
{ return z_reply_channel_closure_call(&closure, sample); }
// clang-format on

#define _z_closure_overloader(callback, droper, ctx, ...) \
Expand Down
4 changes: 2 additions & 2 deletions src/closures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ mod reply_closure;
pub use zenohid_closure::*;
mod zenohid_closure;

pub use response_channel::*;
mod response_channel;
pub use reply_channel::*;
mod reply_channel;

pub use query_channel::*;
mod query_channel;
Expand Down
64 changes: 64 additions & 0 deletions src/closures/reply_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use crate::{z_closure_reply_drop, z_owned_closure_reply_t, z_owned_reply_t};

/// A pair of closures, the `send` one accepting
#[repr(C)]
pub struct z_owned_reply_fifo_channel_t {
pub send: z_owned_closure_reply_t,
pub recv: z_owned_closure_reply_t,
pub try_recv: z_owned_closure_reply_t,
}
#[no_mangle]
pub extern "C" fn z_reply_fifo_channel_drop(channel: &mut z_owned_reply_fifo_channel_t) {
z_closure_reply_drop(&mut channel.send);
z_closure_reply_drop(&mut channel.recv);
z_closure_reply_drop(&mut channel.try_recv);
}
/// Constructs a null safe-to-drop value of 'z_owned_reply_fifo_channel_t' type
#[no_mangle]
pub extern "C" fn z_reply_fifo_channel_null() -> z_owned_reply_fifo_channel_t {
z_owned_reply_fifo_channel_t {
send: z_owned_closure_reply_t::empty(),
recv: z_owned_closure_reply_t::empty(),
try_recv: z_owned_closure_reply_t::empty(),
}
}

/// Creates a new blocking fifo channel, returned as a pair of closures.
///
/// If `bound` is different from 0, that channel will be bound and apply back-pressure when full.
///
/// The `send` end should be passed as callback to a `z_get` call.
///
/// The `recv` end is a synchronous closure that will block until either a `z_owned_reply_t` is available,
/// which it will then return; or until the `send` closure is dropped and all replies have been consumed,
/// at which point it will return an invalidated `z_owned_reply_t`, and so will further calls.
#[no_mangle]
pub extern "C" fn z_reply_fifo_channel_new(bound: usize) -> z_owned_reply_fifo_channel_t {
let (tx, rx) = if bound == 0 {
crossbeam_channel::unbounded()
} else {
crossbeam_channel::bounded(bound)
};
let rx_clone = rx.clone();
z_owned_reply_fifo_channel_t {
send: From::from(move |reply: &mut z_owned_reply_t| {
if let Some(reply) = reply.take() {
if let Err(e) = tx.send(reply) {
log::error!("Attempted to push onto a closed reply_fifo: {}", e)
}
}
}),
recv: From::from(move |receptacle: &mut z_owned_reply_t| {
*receptacle = match rx.recv() {
Ok(val) => val.into(),
Err(_) => None.into(),
};
}),
try_recv: From::from(move |receptacle: &mut z_owned_reply_t| {
*receptacle = match rx_clone.try_recv() {
Ok(val) => val.into(),
Err(_) => None.into(),
};
}),
}
}
Loading
Loading