Skip to content

Commit

Permalink
introduce query channels (#222)
Browse files Browse the repository at this point in the history
  • Loading branch information
p-avital authored Dec 18, 2023
1 parent 0bcb78c commit a2209b7
Show file tree
Hide file tree
Showing 8 changed files with 597 additions and 87 deletions.
95 changes: 95 additions & 0 deletions examples/z_queryable_with_channels.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <stdio.h>
#include <string.h>
#include <zenoh_macros.h>
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
#include <windows.h>
#define sleep(x) Sleep(x * 1000)
#else
#include <unistd.h>
#endif
#include "zenoh.h"

const char *expr = "demo/example/zenoh-c-queryable";
const char *value = "Queryable from C!";
z_keyexpr_t keyexpr;

void query_handler(const z_query_t *query, void *context) {
z_owned_closure_owned_query_t *channel = (z_owned_closure_owned_query_t *)context;
z_owned_query_t oquery = z_query_clone(query);
z_call(*channel, &oquery);
}

int main(int argc, char **argv) {
if (argc > 1) {
expr = argv[1];
}
z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[2]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[2], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY);
exit(-1);
}
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}
keyexpr = z_keyexpr(expr);
if (!z_check(keyexpr)) {
printf("%s is not a valid key expression", expr);
exit(-1);
}

printf("Declaring Queryable on '%s'...\n", expr);
z_owned_query_channel_t channel = zc_query_fifo_new(16);
z_owned_closure_query_t callback = z_closure(query_handler, NULL, &channel.send);
z_owned_queryable_t qable = z_declare_queryable(z_loan(s), keyexpr, z_move(callback), NULL);
if (!z_check(qable)) {
printf("Unable to create queryable.\n");
exit(-1);
}

printf("^C to quit...\n");
z_owned_query_t oquery = z_query_null();
for (z_call(channel.recv, &oquery); z_check(oquery); z_call(channel.recv, &oquery)) {
z_query_t query = z_loan(oquery);
z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(&query));
z_bytes_t pred = z_query_parameters(&query);
z_value_t payload_value = z_query_value(&query);
if (payload_value.payload.len > 0) {
printf(">> [Queryable ] Received Query '%s?%.*s' with value '%.*s'\n", z_loan(keystr), (int)pred.len,
pred.start, (int)payload_value.payload.len, payload_value.payload.start);
} else {
printf(">> [Queryable ] Received Query '%s?%.*s'\n", z_loan(keystr), (int)pred.len, pred.start);
}
z_query_reply_options_t options = z_query_reply_options_default();
options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL);
z_query_reply(&query, keyexpr, (const unsigned char *)value, strlen(value), &options);
z_drop(z_move(keystr));
z_drop(z_move(oquery));
}

z_drop(z_move(qable));
z_drop(z_move(channel));
z_drop(z_move(s));
return 0;
}
123 changes: 114 additions & 9 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,40 @@ typedef struct z_owned_closure_hello_t {
void (*call)(struct z_owned_hello_t*, void*);
void (*drop)(void*);
} z_owned_closure_hello_t;
/**
* Owned variant of a Query received by a Queryable.
*
* You may construct it by `z_query_clone`-ing a loaned query.
* When the last `z_owned_query_t` corresponding to a query is destroyed, or the callback that produced the query cloned to build them returns,
* the query will receive its termination signal.
*
* Holding onto an `z_owned_query_t` for too long (10s by default, can be set in `z_get`'s options) will trigger a timeout error
* to be sent to the querier by the infrastructure, and new responses to the outdated query will be silently dropped.
*/
typedef struct z_owned_query_t {
void *_0;
} z_owned_query_t;
/**
* A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks:
*
* Members:
* void *context: a pointer to an arbitrary state.
* void *call(const struct z_query_t*, const void *context): the typical callback function. `context` will be passed as its last argument.
* void *drop(void*): allows the callback's state to be freed.
*
* Closures are not guaranteed not to be called concurrently.
*
* It is guaranteed that:
*
* - `call` will never be called once `drop` has started.
* - `drop` will only be called **once**, and **after every** `call` has ended.
* - The two previous guarantees imply that `call` and `drop` are never called concurrently.
*/
typedef struct z_owned_closure_owned_query_t {
void *context;
void (*call)(struct z_owned_query_t*, void *context);
void (*drop)(void*);
} z_owned_closure_owned_query_t;
/**
* A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks:
*
Expand Down Expand Up @@ -682,18 +716,30 @@ typedef struct z_put_options_t {
struct z_attachment_t attachment;
} z_put_options_t;
/**
* Owned variant of a Query received by a Queryable.
* A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks:
* - `this` is a pointer to an arbitrary state.
* - `call` is the typical callback function. `this` will be passed as its last argument.
* - `drop` allows the callback's state to be freed.
*
* You may construct it by `z_query_clone`-ing a loaned query.
* When the last `z_owned_query_t` corresponding to a query is destroyed, or the callback that produced the query cloned to build them returns,
* the query will receive its termination signal.
* Closures are not guaranteed not to be called concurrently.
*
* Holding onto an `z_owned_query_t` for too long (10s by default, can be set in `z_get`'s options) will trigger a timeout error
* to be sent to the querier by the infrastructure, and new responses to the outdated query will be silently dropped.
* We guarantee that:
* - `call` will never be called once `drop` has started.
* - `drop` will only be called ONCE, and AFTER EVERY `call` has ended.
* - The two previous guarantees imply that `call` and `drop` are never called concurrently.
*/
typedef struct z_owned_query_t {
void *_0;
} z_owned_query_t;
typedef struct z_owned_query_channel_closure_t {
void *context;
bool (*call)(struct z_owned_query_t*, void*);
void (*drop)(void*);
} z_owned_query_channel_closure_t;
/**
* A pair of closures
*/
typedef struct z_owned_query_channel_t {
struct z_owned_closure_owned_query_t send;
struct z_owned_query_channel_closure_t recv;
} z_owned_query_channel_t;
/**
* Represents the set of options that can be applied to a query reply,
* sent via :c:func:`z_query_reply`.
Expand Down Expand Up @@ -1020,6 +1066,20 @@ ZENOHC_API struct z_owned_closure_hello_t z_closure_hello_null(void);
* Calls the closure. Calling an uninitialized closure is a no-op.
*/
ZENOHC_API
void z_closure_owned_query_call(const struct z_owned_closure_owned_query_t *closure,
struct z_owned_query_t *query);
/**
* Drops the closure. Droping an uninitialized closure is a no-op.
*/
ZENOHC_API void z_closure_owned_query_drop(struct z_owned_closure_owned_query_t *closure);
/**
* Constructs a null safe-to-drop value of 'z_owned_closure_query_t' type
*/
ZENOHC_API struct z_owned_closure_owned_query_t z_closure_owned_query_null(void);
/**
* Calls the closure. Calling an uninitialized closure is a no-op.
*/
ZENOHC_API
void z_closure_query_call(const struct z_owned_closure_query_t *closure,
const struct z_query_t *query);
/**
Expand Down Expand Up @@ -1599,6 +1659,25 @@ ZENOHC_API struct z_put_options_t z_put_options_default(void);
* `z_check(return_value) == false` if there was no attachment to the query.
*/
ZENOHC_API struct z_attachment_t z_query_attachment(const struct z_query_t *query);
/**
* Calls the closure. Calling an uninitialized closure is a no-op.
*/
ZENOHC_API
bool z_query_channel_closure_call(const struct z_owned_query_channel_closure_t *closure,
struct z_owned_query_t *sample);
/**
* Drops the closure. Droping an uninitialized closure is a no-op.
*/
ZENOHC_API void z_query_channel_closure_drop(struct z_owned_query_channel_closure_t *closure);
/**
* Constructs a null safe-to-drop value of 'z_owned_query_channel_closure_t' type
*/
ZENOHC_API struct z_owned_query_channel_closure_t z_query_channel_closure_null(void);
ZENOHC_API void z_query_channel_drop(struct z_owned_query_channel_t *channel);
/**
* Constructs a null safe-to-drop value of 'z_owned_query_channel_t' type
*/
ZENOHC_API struct z_owned_query_channel_t z_query_channel_null(void);
/**
* Returns `false` if `this` is in a gravestone state, `true` otherwise.
*
Expand Down Expand Up @@ -2118,6 +2197,32 @@ int8_t zc_put_owned(struct z_session_t session,
struct z_keyexpr_t keyexpr,
struct zc_owned_payload_t *payload,
const struct z_put_options_t *opts);
/**
* Creates a new blocking fifo channel, returned as a pair of closures.
*
* If `bound` is different from 0, that channel will be bound and apply back-pressure when full.
*
* The `send` end should be passed as callback to a `z_get` call.
*
* The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available,
* which it will then return; or until the `send` closure is dropped and all queries have been consumed,
* at which point it will return an invalidated `z_owned_query_t`, and so will further calls.
*/
ZENOHC_API
struct z_owned_query_channel_t zc_query_fifo_new(size_t bound);
/**
* Creates a new non-blocking fifo channel, returned as a pair of closures.
*
* If `bound` is different from 0, that channel will be bound and apply back-pressure when full.
*
* The `send` end should be passed as callback to a `z_get` call.
*
* The `recv` end is a synchronous closure that will block until either a `z_owned_query_t` is available,
* which it will then return; or until the `send` closure is dropped and all queries have been consumed,
* at which point it will return an invalidated `z_owned_query_t`, and so will further calls.
*/
ZENOHC_API
struct z_owned_query_channel_t zc_query_non_blocking_fifo_new(size_t bound);
/**
* Creates a new blocking fifo channel, returned as a pair of closures.
*
Expand Down
6 changes: 5 additions & 1 deletion include/zenoh_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
z_owned_closure_hello_t * : z_closure_hello_drop, \
z_owned_closure_zid_t * : z_closure_zid_drop, \
z_owned_reply_channel_closure_t * : z_reply_channel_closure_drop, \
z_owned_query_channel_closure_t * : z_query_channel_closure_drop, \
z_owned_reply_channel_t * : z_reply_channel_drop, \
z_owned_query_channel_t * : z_query_channel_drop, \
z_owned_bytes_map_t * : z_bytes_map_drop, \
zc_owned_payload_t * : zc_payload_drop, \
zc_owned_shmbuf_t * : zc_shmbuf_drop, \
Expand Down Expand Up @@ -106,10 +108,12 @@
#define z_call(x, ...) \
_Generic((x), z_owned_closure_sample_t : z_closure_sample_call, \
z_owned_closure_query_t : z_closure_query_call, \
z_owned_closure_owned_query_t : z_closure_owned_query_call, \
z_owned_closure_reply_t : z_closure_reply_call, \
z_owned_closure_hello_t : z_closure_hello_call, \
z_owned_closure_zid_t : z_closure_zid_call, \
z_owned_reply_channel_closure_t : z_reply_channel_closure_call \
z_owned_reply_channel_closure_t : z_reply_channel_closure_call,\
z_owned_query_channel_closure_t : z_query_channel_closure_call \
) (&x, __VA_ARGS__)
// clang-format on

Expand Down
3 changes: 3 additions & 0 deletions src/closures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,8 @@ mod zenohid_closure;
pub use response_channel::*;
mod response_channel;

pub use query_channel::*;
mod query_channel;

pub use hello_closure::*;
mod hello_closure;
Loading

0 comments on commit a2209b7

Please sign in to comment.