From a8e259a370263ce9fd6ede373ed7805443af4811 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 2 Jul 2024 10:41:55 +0200 Subject: [PATCH] feat: re-add rc to queries --- include/zenoh-pico/api/handlers.h | 95 +++++++++++++++++++++++++++- include/zenoh-pico/api/types.h | 4 +- include/zenoh-pico/net/query.h | 4 +- include/zenoh-pico/session/session.h | 4 +- src/api/api.c | 17 ++--- src/session/queryable.c | 6 +- 6 files changed, 112 insertions(+), 18 deletions(-) diff --git a/include/zenoh-pico/api/handlers.h b/include/zenoh-pico/api/handlers.h index 1d6653b6e..0a803635b 100644 --- a/include/zenoh-pico/api/handlers.h +++ b/include/zenoh-pico/api/handlers.h @@ -121,6 +121,97 @@ /* elem_copy_f */ _z_##item_name##_copy, \ /* elem_drop_f */ z_##item_name##_drop) +#define _Z_CHANNEL_RC_DEFINE_IMPL(handler_type, handler_name, handler_new_f_name, callback_type, callback_new_f, \ + collection_type, collection_new_f, collection_free_f, collection_push_f, \ + collection_pull_f, collection_try_pull_f, elem_owned_type, elem_loaned_type, \ + elem_copy_f, elem_drop_f) \ + typedef struct { \ + collection_type *collection; \ + } handler_type; \ + \ + _Z_OWNED_TYPE_PTR(handler_type, handler_name) \ + _Z_LOANED_TYPE(handler_type, handler_name) \ + \ + static inline void _z_##handler_name##_elem_free(void **elem) { \ + elem_drop_f((elem_owned_type *)*elem); \ + z_free(*elem); \ + *elem = NULL; \ + } \ + static inline void _z_##handler_name##_elem_move(void *dst, void *src) { \ + memcpy(dst, src, sizeof(elem_owned_type)); \ + z_free(src); \ + } \ + static inline void _z_##handler_name##_send(const elem_loaned_type *elem, void *context) { \ + elem_owned_type *internal_elem = (elem_owned_type *)z_malloc(sizeof(elem_owned_type)); \ + if (internal_elem == NULL) { \ + _Z_ERROR("Out of memory"); \ + return; \ + } \ + if (elem == NULL) { \ + internal_elem->_rc.in = NULL; \ + } else { \ + elem_copy_f(&internal_elem->_rc, elem); \ + } \ + int8_t ret = collection_push_f(internal_elem, context, _z_##handler_name##_elem_free); \ + if (ret != _Z_RES_OK) { \ + _Z_ERROR("%s failed: %i", #collection_push_f, ret); \ + } \ + } \ + static inline void z_##handler_name##_recv(const z_loaned_##handler_name##_t *handler, elem_owned_type *elem) { \ + int8_t ret = collection_pull_f(elem, (collection_type *)handler->collection, _z_##handler_name##_elem_move); \ + if (ret != _Z_RES_OK) { \ + _Z_ERROR("%s failed: %i", #collection_pull_f, ret); \ + } \ + } \ + static inline void z_##handler_name##_try_recv(const z_loaned_##handler_name##_t *handler, \ + elem_owned_type *elem) { \ + int8_t ret = \ + collection_try_pull_f(elem, (collection_type *)handler->collection, _z_##handler_name##_elem_move); \ + if (ret != _Z_RES_OK) { \ + _Z_ERROR("%s failed: %i", #collection_try_pull_f, ret); \ + } \ + } \ + \ + static inline void _z_##handler_name##_free(handler_type **handler) { \ + handler_type *ptr = *handler; \ + if (ptr != NULL) { \ + collection_free_f(ptr->collection, _z_##handler_name##_elem_free); \ + z_free(ptr); \ + *handler = NULL; \ + } \ + } \ + static inline void _z_##handler_name##_copy(void *dst, const void *src) { \ + (void)(dst); \ + (void)(src); \ + } \ + \ + _Z_OWNED_FUNCTIONS_PTR_IMPL(handler_type, handler_name, _z_##handler_name##_copy, _z_##handler_name##_free) \ + \ + static inline int8_t handler_new_f_name(callback_type *callback, z_owned_##handler_name##_t *handler, \ + size_t capacity) { \ + handler->_val = (handler_type *)z_malloc(sizeof(handler_type)); \ + handler->_val->collection = collection_new_f(capacity); \ + callback_new_f(callback, _z_##handler_name##_send, NULL, handler->_val->collection); \ + return _Z_RES_OK; \ + } + +#define _Z_CHANNEL_RC_DEFINE(item_name, kind_name) \ + _Z_CHANNEL_RC_DEFINE_IMPL(/* handler_type */ _z_##kind_name##_handler_##item_name##_t, \ + /* handler_name */ kind_name##_handler_##item_name, \ + /* handler_new_f_name */ z_##kind_name##_channel_##item_name##_new, \ + /* callback_type */ z_owned_closure_##item_name##_t, \ + /* callback_new_f */ z_closure_##item_name, \ + /* collection_type */ _z_##kind_name##_mt_t, \ + /* collection_new_f */ _z_##kind_name##_mt_new, \ + /* collection_free_f */ _z_##kind_name##_mt_free, \ + /* collection_push_f */ _z_##kind_name##_mt_push, \ + /* collection_pull_f */ _z_##kind_name##_mt_pull, \ + /* collection_try_pull_f */ _z_##kind_name##_mt_try_pull, \ + /* elem_owned_type */ z_owned_##item_name##_t, \ + /* elem_loaned_type */ z_loaned_##item_name##_t, \ + /* elem_copy_f */ _z_##item_name##_rc_copy, \ + /* elem_drop_f */ z_##item_name##_drop) + #define _Z_CHANNEL_DEFINE_DUMMY(item_name, kind_name) \ typedef struct { \ uint8_t _foo; \ @@ -148,12 +239,12 @@ _Z_CHANNEL_DEFINE(sample, fifo) // This macro defines: // z_ring_channel_query_new() // z_owned_ring_handler_query_t/z_loaned_ring_handler_query_t -_Z_CHANNEL_DEFINE(query, ring) +_Z_CHANNEL_RC_DEFINE(query, ring) // This macro defines: // z_fifo_channel_query_new() // z_owned_fifo_handler_query_t/z_loaned_fifo_handler_query_t -_Z_CHANNEL_DEFINE(query, fifo) +_Z_CHANNEL_RC_DEFINE(query, fifo) #else // Z_FEATURE_QUERYABLE _Z_CHANNEL_DEFINE_DUMMY(query, ring) _Z_CHANNEL_DEFINE_DUMMY(query, fifo) diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index 80fa04f9c..324e86d2a 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -171,8 +171,8 @@ _Z_LOANED_TYPE(_z_queryable_t, queryable) * Represents a Zenoh Query entity, received by Zenoh Queryable entities. * */ -_Z_OWNED_TYPE_PTR(_z_query_t, query) -_Z_LOANED_TYPE(_z_query_t, query) +_Z_OWNED_TYPE_RC(_z_query_rc_t, query) +_Z_LOANED_TYPE(_z_query_rc_t, query) /** * Represents the encoding of a payload, in a MIME-like format. diff --git a/include/zenoh-pico/net/query.h b/include/zenoh-pico/net/query.h index 7812bd1ec..670dfed74 100644 --- a/include/zenoh-pico/net/query.h +++ b/include/zenoh-pico/net/query.h @@ -28,7 +28,7 @@ typedef struct _z_query_t { _z_value_t _value; _z_keyexpr_t _key; uint32_t _request_id; - _z_session_t *_zn; // FIXME: Switch to session rc, Issue #476 + _z_session_t *_zn; // FIXME: Potential UB source, Issue #476 _z_bytes_t attachment; char *_parameters; _Bool _anyke; @@ -39,6 +39,8 @@ void _z_query_clear(_z_query_t *q); void _z_query_copy(_z_query_t *dst, const _z_query_t *src); void _z_query_free(_z_query_t **query); +_Z_REFCOUNT_DEFINE(_z_query, _z_query) + /** * Return type when declaring a queryable. */ diff --git a/include/zenoh-pico/session/session.h b/include/zenoh-pico/session/session.h index 61f78f8b2..8dd99e977 100644 --- a/include/zenoh-pico/session/session.h +++ b/include/zenoh-pico/session/session.h @@ -81,12 +81,12 @@ typedef struct { } _z_publication_t; // Forward type declaration to avoid cyclical include -typedef struct _z_query_t _z_query_t; +typedef struct _z_query_rc_t _z_query_rc_t; /** * The callback signature of the functions handling query messages. */ -typedef void (*_z_queryable_handler_t)(const _z_query_t *query, void *arg); +typedef void (*_z_queryable_handler_t)(const _z_query_rc_t *query, void *arg); typedef struct { _z_keyexpr_t _key; diff --git a/src/api/api.c b/src/api/api.c index 683d4def7..ab8a33a72 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -559,16 +559,16 @@ z_query_consolidation_t z_query_consolidation_none(void) { z_query_consolidation_t z_query_consolidation_default(void) { return z_query_consolidation_auto(); } void z_query_parameters(const z_loaned_query_t *query, z_view_string_t *parameters) { - parameters->_val.val = query->_parameters; - parameters->_val.len = strlen(query->_parameters); + parameters->_val.val = query->in->val._parameters; + parameters->_val.len = strlen(query->in->val._parameters); } -const z_loaned_bytes_t *z_query_attachment(const z_loaned_query_t *query) { return &query->attachment; } +const z_loaned_bytes_t *z_query_attachment(const z_loaned_query_t *query) { return &query->in->val.attachment; } -const z_loaned_keyexpr_t *z_query_keyexpr(const z_loaned_query_t *query) { return &query->_key; } +const z_loaned_keyexpr_t *z_query_keyexpr(const z_loaned_query_t *query) { return &query->in->val._key; } -const z_loaned_bytes_t *z_query_payload(const z_loaned_query_t *query) { return &query->_value.payload; } -const z_loaned_encoding_t *z_query_encoding(const z_loaned_query_t *query) { return &query->_value.encoding; } +const z_loaned_bytes_t *z_query_payload(const z_loaned_query_t *query) { return &query->in->val._value.payload; } +const z_loaned_encoding_t *z_query_encoding(const z_loaned_query_t *query) { return &query->in->val._value.encoding; } void z_closure_sample_call(const z_owned_closure_sample_t *closure, const z_loaned_sample_t *sample) { if (closure->call != NULL) { @@ -1060,7 +1060,7 @@ int8_t _z_queryable_drop(_z_queryable_t **queryable) { return ret; } -_Z_OWNED_FUNCTIONS_PTR_IMPL(_z_query_t, query, _z_query_copy, _z_query_free) +_Z_OWNED_FUNCTIONS_RC_IMPL(query) _Z_OWNED_FUNCTIONS_PTR_IMPL(_z_queryable_t, queryable, _z_owner_noop_copy, _z_queryable_drop) void z_queryable_options_default(z_queryable_options_t *options) { options->complete = _Z_QUERYABLE_COMPLETE_DEFAULT; } @@ -1114,7 +1114,8 @@ int8_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *ke _z_value_t value = {.payload = _z_bytes_from_owned_bytes(payload), .encoding = _z_encoding_from_owned(opts.encoding)}; - int8_t ret = _z_send_reply(query, *keyexpr, value, Z_SAMPLE_KIND_PUT, _z_bytes_from_owned_bytes(opts.attachment)); + int8_t ret = + _z_send_reply(&query->in->val, *keyexpr, value, Z_SAMPLE_KIND_PUT, _z_bytes_from_owned_bytes(opts.attachment)); if (payload != NULL) { z_bytes_drop(payload); } diff --git a/src/session/queryable.c b/src/session/queryable.c index a2ce2ff76..9ca478151 100644 --- a/src/session/queryable.c +++ b/src/session/queryable.c @@ -145,8 +145,8 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const _zp_session_unlock_mutex(zn); // Build the z_query - _z_query_t query = _z_query_null(); - query = _z_query_create(&msgq->_ext_value, &key, &msgq->_parameters, zn, qid, attachment); + _z_query_rc_t query = _z_query_rc_new(); + query.in->val = _z_query_create(&msgq->_ext_value, &key, &msgq->_parameters, zn, qid, attachment); // Parse session_queryable list _z_session_queryable_rc_list_t *xs = qles; while (xs != NULL) { @@ -155,7 +155,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const xs = _z_session_queryable_rc_list_tail(xs); } // Clean up - _z_query_clear(&query); + _z_query_rc_drop(&query); _z_session_queryable_rc_list_free(&qles); } else { _zp_session_unlock_mutex(zn);