From 27fb58e2fb40f41098b930e47dd5b2af255c24b6 Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Tue, 2 Jul 2024 15:43:55 +0200 Subject: [PATCH] Remove refcounting from reply/sample. (#440) * feat: create reply sample union type * feat: remove rc from sample/query/reply * fix: -Wconversion errors * fix: strncat jump condition based on uninitialized data * fix: move attachments data * feat: add encoding to attachment examples * feat: add _z_string_steal and _z_encoding_steal * fix: remove const from encoding as function arg * feat: copy payload in z_write and steal encoding * fix: attachment double drop issue * fix: update attachment test * fix: decode attachment memory leak * fix: copy attachment instead of move * fix: revert previous z_write changes * chore: format code * fix: revert const attachment changes * feat: re-add rc to queries * fix: double suffix allocation * feat: use elem_clone to avoid duplicating channel code * fix: remove double sample create init --- examples/unix/c11/z_get_attachment.c | 12 ++++++++- examples/unix/c11/z_pub_attachment.c | 2 +- examples/unix/c11/z_queryable_attachment.c | 24 ++++++++++++----- examples/unix/c11/z_sub_attachment.c | 2 +- include/zenoh-pico/api/handlers.h | 11 +++----- include/zenoh-pico/api/primitives.h | 2 +- include/zenoh-pico/api/types.h | 8 +++--- include/zenoh-pico/collections/string.h | 1 + include/zenoh-pico/net/encoding.h | 1 + include/zenoh-pico/net/query.h | 9 ++++--- include/zenoh-pico/net/reply.h | 8 +++--- include/zenoh-pico/net/sample.h | 6 ++--- include/zenoh-pico/session/session.h | 14 +++++----- src/api/api.c | 25 +++++++++-------- src/collections/bytes.c | 10 +++---- src/collections/string.c | 10 +++++++ src/net/encoding.c | 9 +++++++ src/net/query.c | 31 +++++++++++++++++----- src/net/reply.c | 29 +++++++++----------- src/net/sample.c | 14 +++++----- src/protocol/definitions/message.c | 4 ++- src/protocol/keyexpr.c | 1 - src/session/query.c | 18 ++++++------- src/session/queryable.c | 1 - src/session/subscription.c | 6 ++--- tests/attachment.py | 26 +++++++++--------- tests/z_channels_test.c | 16 ++++++----- 27 files changed, 175 insertions(+), 125 deletions(-) diff --git a/examples/unix/c11/z_get_attachment.c b/examples/unix/c11/z_get_attachment.c index 4fb379950..b45b12854 100644 --- a/examples/unix/c11/z_get_attachment.c +++ b/examples/unix/c11/z_get_attachment.c @@ -70,12 +70,13 @@ void parse_attachment(kv_pairs_rx_t *kvp, const z_loaned_bytes_t *attachment) { z_bytes_deserialize_into_string(z_loan(second), &kvp->data[kvp->current_idx].value); z_bytes_drop(&first); z_bytes_drop(&second); + z_bytes_drop(&kv); kvp->current_idx++; } } void print_attachment(kv_pairs_rx_t *kvp) { - printf(" with attachment:\n"); + printf(" with attachment:\n"); for (uint32_t i = 0; i < kvp->current_idx; i++) { printf(" %d: %s, %s\n", i, z_string_data(z_loan(kvp->data[i].key)), z_string_data(z_loan(kvp->data[i].value))); @@ -105,8 +106,11 @@ void reply_handler(const z_loaned_reply_t *reply, void *ctx) { z_keyexpr_to_string(z_sample_keyexpr(sample), &keystr); z_owned_string_t replystr; z_bytes_deserialize_into_string(z_sample_payload(sample), &replystr); + z_owned_string_t encoding; + z_encoding_to_string(z_sample_encoding(sample), &encoding); printf(">> Received ('%s': '%s')\n", z_string_data(z_loan(keystr)), z_string_data(z_loan(replystr))); + printf(" with encoding: %s\n", z_string_data(z_loan(encoding))); // Check attachment kv_pairs_rx_t kvp = { @@ -119,6 +123,7 @@ void reply_handler(const z_loaned_reply_t *reply, void *ctx) { z_drop(z_move(keystr)); z_drop(z_move(replystr)); + z_drop(z_move(encoding)); } else { printf(">> Received an error\n"); } @@ -213,6 +218,11 @@ int main(int argc, char **argv) { z_bytes_serialize_from_iter(&attachment, create_attachment_iter, (void *)&ctx); opts.attachment = z_move(attachment); + // Add encoding value + z_owned_encoding_t encoding; + z_encoding_from_str(&encoding, "zenoh/string;utf8"); + opts.encoding = z_move(encoding); + z_owned_closure_reply_t callback; z_closure(&callback, reply_handler, reply_dropper); if (z_get(z_loan(s), z_loan(ke), "", z_move(callback), &opts) < 0) { diff --git a/examples/unix/c11/z_pub_attachment.c b/examples/unix/c11/z_pub_attachment.c index e14f0caa7..a55e1cf98 100644 --- a/examples/unix/c11/z_pub_attachment.c +++ b/examples/unix/c11/z_pub_attachment.c @@ -164,7 +164,7 @@ int main(int argc, char **argv) { options.attachment = z_move(attachment); // Add encoding value - z_encoding_from_str(&encoding, "text/plain;utf8"); + z_encoding_from_str(&encoding, "zenoh/string;utf8"); options.encoding = z_move(encoding); z_publisher_put(z_loan(pub), z_move(payload), &options); diff --git a/examples/unix/c11/z_queryable_attachment.c b/examples/unix/c11/z_queryable_attachment.c index 7d2fd9778..bbdbe8b63 100644 --- a/examples/unix/c11/z_queryable_attachment.c +++ b/examples/unix/c11/z_queryable_attachment.c @@ -82,12 +82,13 @@ void parse_attachment(kv_pairs_rx_t *kvp, const z_loaned_bytes_t *attachment) { z_bytes_deserialize_into_string(z_loan(second), &kvp->data[kvp->current_idx].value); z_bytes_drop(&first); z_bytes_drop(&second); + z_bytes_drop(&kv); kvp->current_idx++; } } void print_attachment(kv_pairs_rx_t *kvp) { - printf(" with attachment:\n"); + printf(" with attachment:\n"); for (uint32_t i = 0; i < kvp->current_idx; i++) { printf(" %d: %s, %s\n", i, z_string_data(z_loan(kvp->data[i].key)), z_string_data(z_loan(kvp->data[i].value))); @@ -110,11 +111,16 @@ void query_handler(const z_loaned_query_t *query, void *ctx) { z_query_parameters(query, ¶ms); printf(" >> [Queryable handler] Received Query '%s%.*s'\n", z_string_data(z_loan(keystr)), (int)z_loan(params)->len, z_loan(params)->val); + // Process encoding + z_owned_string_t encoding; + z_encoding_to_string(z_query_encoding(query), &encoding); + printf(" with encoding: %s\n", z_string_data(z_loan(encoding))); + // Process value z_owned_string_t payload_string; z_bytes_deserialize_into_string(z_query_payload(query), &payload_string); if (z_string_len(z_loan(payload_string)) > 1) { - printf(" with value '%s'\n", z_string_data(z_loan(payload_string))); + printf(" with value '%s'\n", z_string_data(z_loan(payload_string))); } // Check attachment kv_pairs_rx_t kvp = { @@ -125,22 +131,28 @@ void query_handler(const z_loaned_query_t *query, void *ctx) { } drop_attachment(&kvp); z_drop(z_move(payload_string)); + z_drop(z_move(encoding)); - // Reply value encoding + // Reply payload z_owned_bytes_t reply_payload; z_bytes_serialize_from_str(&reply_payload, value); + z_query_reply_options_t options; + z_query_reply_options_default(&options); + // Reply attachment kv_pair_t kvs[1]; kvs[0] = (kv_pair_t){.key = "reply_key", .value = "reply_value"}; kv_pairs_tx_t kv_ctx = (kv_pairs_tx_t){.data = kvs, .current_idx = 0, .len = 1}; z_owned_bytes_t attachment; z_bytes_serialize_from_iter(&attachment, create_attachment_iter, (void *)&kv_ctx); - - z_query_reply_options_t options; - z_query_reply_options_default(&options); options.attachment = z_move(attachment); + // Reply encoding + z_owned_encoding_t reply_encoding; + z_encoding_from_str(&reply_encoding, "zenoh/string;utf8"); + options.encoding = z_move(reply_encoding); + z_query_reply(query, z_query_keyexpr(query), z_move(reply_payload), &options); z_drop(z_move(keystr)); msg_nb++; diff --git a/examples/unix/c11/z_sub_attachment.c b/examples/unix/c11/z_sub_attachment.c index b84014d28..db3198b6d 100644 --- a/examples/unix/c11/z_sub_attachment.c +++ b/examples/unix/c11/z_sub_attachment.c @@ -53,7 +53,7 @@ void parse_attachment(kv_pairs_t *kvp, const z_loaned_bytes_t *attachment) { } void print_attachment(kv_pairs_t *kvp) { - printf(" with attachment:\n"); + printf(" with attachment:\n"); for (uint32_t i = 0; i < kvp->current_idx; i++) { printf(" %d: %s, %s\n", i, z_string_data(z_loan(kvp->data[i].key)), z_string_data(z_loan(kvp->data[i].value))); diff --git a/include/zenoh-pico/api/handlers.h b/include/zenoh-pico/api/handlers.h index 473fa2477..b12512c20 100644 --- a/include/zenoh-pico/api/handlers.h +++ b/include/zenoh-pico/api/handlers.h @@ -27,7 +27,7 @@ #define _Z_CHANNEL_DEFINE_IMPL(handler_type, handler_name, handler_new_f_name, callback_type, callback_new_f, \ collection_type, collection_new_f, collection_free_f, collection_push_f, \ collection_pull_f, collection_try_pull_f, elem_owned_type, elem_loaned_type, \ - elem_copy_f, elem_drop_f) \ + elem_clone_f, elem_drop_f) \ typedef struct { \ collection_type *collection; \ } handler_type; \ @@ -50,11 +50,8 @@ _Z_ERROR("Out of memory"); \ return; \ } \ - if (elem == NULL) { \ - internal_elem->_rc.in = NULL; \ - } else { \ - elem_copy_f(&internal_elem->_rc, elem); \ - } \ + elem_clone_f(internal_elem, elem); \ + \ int8_t ret = collection_push_f(internal_elem, context, _z_##handler_name##_elem_free); \ if (ret != _Z_RES_OK) { \ _Z_ERROR("%s failed: %i", #collection_push_f, ret); \ @@ -112,7 +109,7 @@ /* collection_try_pull_f */ _z_##kind_name##_mt_try_pull, \ /* elem_owned_type */ z_owned_##item_name##_t, \ /* elem_loaned_type */ z_loaned_##item_name##_t, \ - /* elem_copy_f */ _z_##item_name##_rc_copy, \ + /* elem_clone_f */ z_##item_name##_clone, \ /* elem_drop_f */ z_##item_name##_drop) #define _Z_CHANNEL_DEFINE_DUMMY(item_name, kind_name) \ diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index a94b5a1a4..c63732b2d 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -1239,7 +1239,7 @@ z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample); z_qos_t z_sample_qos(const z_loaned_sample_t *sample); /** - * Gets the attachment of a value by aliasing it. + * Gets the attachment of a sample by aliasing it. * * Parameters: * sample: Pointer to a :c:type:`z_loaned_sample_t` to get the attachment from. diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index 2357f09f4..a2ed9349f 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -403,8 +403,8 @@ static inline z_qos_t z_qos_default(void) { return _Z_N_QOS_DEFAULT; } * z_timestamp_t timestamp: The timestamp of this data sample. * z_qos_t qos: Quality of service settings used to deliver this sample. */ -_Z_OWNED_TYPE_RC(_z_sample_rc_t, sample) -_Z_LOANED_TYPE(_z_sample_rc_t, sample) +_Z_OWNED_TYPE_PTR(_z_sample_t, sample) +_Z_LOANED_TYPE(_z_sample_t, sample) /** * Represents the content of a `hello` message returned by a zenoh entity as a reply to a `scout` message. @@ -420,8 +420,8 @@ _Z_LOANED_TYPE(_z_hello_t, hello) /** * Represents the reply to a query. */ -_Z_OWNED_TYPE_RC(_z_reply_rc_t, reply) -_Z_LOANED_TYPE(_z_reply_rc_t, reply) +_Z_OWNED_TYPE_PTR(_z_reply_t, reply) +_Z_LOANED_TYPE(_z_reply_t, reply) /** * Represents an array of non null-terminated string. diff --git a/include/zenoh-pico/collections/string.h b/include/zenoh-pico/collections/string.h index 55be113bc..e2e423a59 100644 --- a/include/zenoh-pico/collections/string.h +++ b/include/zenoh-pico/collections/string.h @@ -78,6 +78,7 @@ _z_string_t *_z_string_make_as_ptr(const char *value); size_t _z_string_size(const _z_string_t *s); int8_t _z_string_copy(_z_string_t *dst, const _z_string_t *src); void _z_string_move(_z_string_t *dst, _z_string_t *src); +_z_string_t _z_string_steal(_z_string_t *str); void _z_string_move_str(_z_string_t *dst, char *src); void _z_string_clear(_z_string_t *s); void _z_string_free(_z_string_t **s); diff --git a/include/zenoh-pico/net/encoding.h b/include/zenoh-pico/net/encoding.h index 9235d45b3..9c2899d6d 100644 --- a/include/zenoh-pico/net/encoding.h +++ b/include/zenoh-pico/net/encoding.h @@ -34,5 +34,6 @@ void _z_encoding_clear(_z_encoding_t *encoding); _Bool _z_encoding_check(const _z_encoding_t *encoding); int8_t _z_encoding_copy(_z_encoding_t *dst, const _z_encoding_t *src); void _z_encoding_move(_z_encoding_t *dst, _z_encoding_t *src); +_z_encoding_t _z_encoding_steal(_z_encoding_t *val); #endif /* ZENOH_PICO_ENCODING_NETAPI_H */ diff --git a/include/zenoh-pico/net/query.h b/include/zenoh-pico/net/query.h index e34d587a2..670dfed74 100644 --- a/include/zenoh-pico/net/query.h +++ b/include/zenoh-pico/net/query.h @@ -28,7 +28,7 @@ typedef struct _z_query_t { _z_value_t _value; _z_keyexpr_t _key; uint32_t _request_id; - _z_session_t *_zn; + _z_session_t *_zn; // FIXME: Potential UB source, Issue #476 _z_bytes_t attachment; char *_parameters; _Bool _anyke; @@ -36,6 +36,9 @@ typedef struct _z_query_t { _z_query_t _z_query_null(void); void _z_query_clear(_z_query_t *q); +void _z_query_copy(_z_query_t *dst, const _z_query_t *src); +void _z_query_free(_z_query_t **query); + _Z_REFCOUNT_DEFINE(_z_query, _z_query) /** @@ -47,8 +50,8 @@ typedef struct { } _z_queryable_t; #if Z_FEATURE_QUERYABLE == 1 -_z_query_t _z_query_create(const _z_value_t *value, const _z_keyexpr_t *key, const _z_slice_t *parameters, - _z_session_t *zn, uint32_t request_id, const _z_bytes_t attachment); +_z_query_t _z_query_create(const _z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_t *zn, + uint32_t request_id, const _z_bytes_t attachment); void _z_queryable_clear(_z_queryable_t *qbl); void _z_queryable_free(_z_queryable_t **qbl); #endif diff --git a/include/zenoh-pico/net/reply.h b/include/zenoh-pico/net/reply.h index 3a14a58a3..dca38a666 100644 --- a/include/zenoh-pico/net/reply.h +++ b/include/zenoh-pico/net/reply.h @@ -32,12 +32,12 @@ * */ typedef struct _z_reply_data_t { - _z_sample_rc_t sample; + _z_sample_t sample; _z_id_t replier_id; } _z_reply_data_t; void _z_reply_data_clear(_z_reply_data_t *rd); -void _z_reply_data_copy(_z_reply_data_t *dst, _z_reply_data_t *src); +void _z_reply_data_copy(_z_reply_data_t *dst, const _z_reply_data_t *src); _z_reply_t _z_reply_move(_z_reply_t *src_reply); _Z_ELEM_DEFINE(_z_reply_data, _z_reply_data_t, _z_noop_size, _z_reply_data_clear, _z_noop_copy) @@ -60,13 +60,11 @@ typedef struct _z_reply_t { _z_reply_t _z_reply_null(void); void _z_reply_clear(_z_reply_t *src); void _z_reply_free(_z_reply_t **hello); -void _z_reply_copy(_z_reply_t *dst, _z_reply_t *src); +void _z_reply_copy(_z_reply_t *dst, const _z_reply_t *src); _z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp, _z_encoding_t encoding, z_sample_kind_t kind, const _z_bytes_t attachment); -_Z_REFCOUNT_DEFINE(_z_reply, _z_reply) - typedef struct _z_pending_reply_t { _z_reply_t _reply; _z_timestamp_t _tstamp; diff --git a/include/zenoh-pico/net/sample.h b/include/zenoh-pico/net/sample.h index 0817d9d1e..5dee423d7 100644 --- a/include/zenoh-pico/net/sample.h +++ b/include/zenoh-pico/net/sample.h @@ -39,8 +39,6 @@ typedef struct _z_sample_t { } _z_sample_t; void _z_sample_clear(_z_sample_t *sample); -_Z_REFCOUNT_DEFINE(_z_sample, _z_sample) - _z_sample_t _z_sample_null(void); _Bool _z_sample_check(const _z_sample_t *sample); void _z_sample_move(_z_sample_t *dst, _z_sample_t *src); @@ -56,8 +54,8 @@ void _z_sample_free(_z_sample_t **sample); void _z_sample_copy(_z_sample_t *dst, const _z_sample_t *src); _z_sample_t _z_sample_duplicate(const _z_sample_t *src); -_z_sample_t _z_sample_create(const _z_keyexpr_t *key, const _z_bytes_t payload, _z_timestamp_t timestamp, - const _z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos, +_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, _z_timestamp_t timestamp, + _z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos, const _z_bytes_t attachment); #endif /* ZENOH_PICO_SAMPLE_NETAPI_H */ diff --git a/include/zenoh-pico/session/session.h b/include/zenoh-pico/session/session.h index a4cfe3c44..a7dcdcde6 100644 --- a/include/zenoh-pico/session/session.h +++ b/include/zenoh-pico/session/session.h @@ -49,12 +49,12 @@ _Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_noop_size, _z_resource_clear, _z_r _Z_LIST_DEFINE(_z_resource, _z_resource_t) // Forward declaration to avoid cyclical include -typedef struct _z_sample_rc_t z_loaned_sample_t; +typedef struct _z_sample_t _z_sample_t; /** * The callback signature of the functions handling data messages. */ -typedef void (*_z_data_handler_t)(const z_loaned_sample_t *sample, void *arg); +typedef void (*_z_data_handler_t)(const _z_sample_t *sample, void *arg); typedef struct { _z_keyexpr_t _key; @@ -81,12 +81,12 @@ typedef struct { } _z_publication_t; // Forward type declaration to avoid cyclical include -typedef struct _z_query_rc_t z_loaned_query_t; +typedef struct _z_query_rc_t _z_query_rc_t; /** * The callback signature of the functions handling query messages. */ -typedef void (*_z_queryable_handler_t)(const z_loaned_query_t *query, void *arg); +typedef void (*_z_queryable_handler_t)(const _z_query_rc_t *query, void *arg); typedef struct { _z_keyexpr_t _key; @@ -110,12 +110,12 @@ _Z_LIST_DEFINE(_z_session_queryable_rc, _z_session_queryable_rc_t) typedef struct _z_reply_t _z_reply_t; typedef _z_list_t _z_reply_data_list_t; typedef _z_list_t _z_pending_reply_list_t; -typedef struct _z_reply_rc_t _z_reply_rc_t; -typedef _z_reply_rc_t z_loaned_reply_t; +typedef struct _z_reply_t _z_reply_t; + /** * The callback signature of the functions handling query replies. */ -typedef void (*_z_reply_handler_t)(const z_loaned_reply_t *reply, void *arg); +typedef void (*_z_reply_handler_t)(const _z_reply_t *reply, void *arg); typedef struct { _z_keyexpr_t _key; diff --git a/src/api/api.c b/src/api/api.c index 9f914634f..aec73a739 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -291,6 +291,7 @@ static int8_t _z_encoding_convert_into_string(const z_loaned_encoding_t *encodin } // Allocate string char *value = (char *)z_malloc(sizeof(char) * total_len); + memset(value, 0, total_len); if (value == NULL) { return _Z_ERR_SYSTEM_OUT_OF_MEMORY; } @@ -666,7 +667,7 @@ static _z_encoding_t _z_encoding_from_owned(const z_owned_encoding_t *encoding) } #endif -_Z_OWNED_FUNCTIONS_RC_IMPL(sample) +_Z_OWNED_FUNCTIONS_PTR_IMPL(_z_sample_t, sample, _z_sample_copy, _z_sample_free) _Z_OWNED_FUNCTIONS_RC_IMPL(session) _Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_sample, _z_data_handler_t, z_dropper_handler_t) @@ -809,15 +810,13 @@ int8_t z_info_routers_zid(const z_loaned_session_t *zs, z_owned_closure_zid_t *c z_id_t z_info_zid(const z_loaned_session_t *zs) { return _Z_RC_IN_VAL(zs)._local_zid; } -const z_loaned_keyexpr_t *z_sample_keyexpr(const z_loaned_sample_t *sample) { return &_Z_RC_IN_VAL(sample).keyexpr; } -z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample) { return _Z_RC_IN_VAL(sample).kind; } -const z_loaned_bytes_t *z_sample_payload(const z_loaned_sample_t *sample) { return &_Z_RC_IN_VAL(sample).payload; } -z_timestamp_t z_sample_timestamp(const z_loaned_sample_t *sample) { return _Z_RC_IN_VAL(sample).timestamp; } -const z_loaned_encoding_t *z_sample_encoding(const z_loaned_sample_t *sample) { return &_Z_RC_IN_VAL(sample).encoding; } -z_qos_t z_sample_qos(const z_loaned_sample_t *sample) { return _Z_RC_IN_VAL(sample).qos; } -const z_loaned_bytes_t *z_sample_attachment(const z_loaned_sample_t *sample) { - return &_Z_RC_IN_VAL(sample).attachment; -} +const z_loaned_keyexpr_t *z_sample_keyexpr(const z_loaned_sample_t *sample) { return &sample->keyexpr; } +z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample) { return sample->kind; } +const z_loaned_bytes_t *z_sample_payload(const z_loaned_sample_t *sample) { return &sample->payload; } +z_timestamp_t z_sample_timestamp(const z_loaned_sample_t *sample) { return sample->timestamp; } +const z_loaned_encoding_t *z_sample_encoding(const z_loaned_sample_t *sample) { return &sample->encoding; } +z_qos_t z_sample_qos(const z_loaned_sample_t *sample) { return sample->qos; } +const z_loaned_bytes_t *z_sample_attachment(const z_loaned_sample_t *sample) { return &sample->attachment; } const z_loaned_bytes_t *z_reply_err_payload(const z_loaned_reply_err_t *reply_err) { return &reply_err->payload; } const z_loaned_encoding_t *z_reply_err_encoding(const z_loaned_reply_err_t *reply_err) { return &reply_err->encoding; } @@ -989,7 +988,7 @@ z_owned_keyexpr_t z_publisher_keyexpr(z_loaned_publisher_t *publisher) { #endif #if Z_FEATURE_QUERY == 1 -_Z_OWNED_FUNCTIONS_RC_IMPL(reply) +_Z_OWNED_FUNCTIONS_PTR_IMPL(_z_reply_t, reply, _z_reply_copy, _z_reply_free) void z_get_options_default(z_get_options_t *options) { options->target = z_query_target_default(); @@ -1045,11 +1044,11 @@ int8_t z_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, co _Bool z_reply_is_ok(const z_loaned_reply_t *reply) { _ZP_UNUSED(reply); // For the moment always return TRUE. - // The support for reply errors will come in the next release. + // FIXME: The support for reply errors will come in the next release. return true; } -const z_loaned_sample_t *z_reply_ok(const z_loaned_reply_t *reply) { return &reply->in->val.data.sample; } +const z_loaned_sample_t *z_reply_ok(const z_loaned_reply_t *reply) { return &reply->data.sample; } const z_loaned_reply_err_t *z_reply_err(const z_loaned_reply_t *reply) { _ZP_UNUSED(reply); diff --git a/src/collections/bytes.c b/src/collections/bytes.c index f82f00ef2..f6c2c4679 100644 --- a/src/collections/bytes.c +++ b/src/collections/bytes.c @@ -316,13 +316,13 @@ int8_t _z_bytes_reader_seek(_z_bytes_reader_t *reader, int64_t offset, int origi reader->in_slice_idx = 0; reader->slice_idx = 0; if (offset < 0) return _Z_ERR_DID_NOT_READ; - return _z_bytes_reader_seek_forward(reader, offset); + return _z_bytes_reader_seek_forward(reader, (size_t)offset); } case SEEK_CUR: { if (offset >= 0) - return _z_bytes_reader_seek_forward(reader, offset); + return _z_bytes_reader_seek_forward(reader, (size_t)offset); else - return _z_bytes_reader_seek_backward(reader, -offset); + return _z_bytes_reader_seek_backward(reader, (size_t)(-offset)); } case SEEK_END: { reader->byte_idx = _z_bytes_len(reader->bytes); @@ -331,14 +331,14 @@ int8_t _z_bytes_reader_seek(_z_bytes_reader_t *reader, int64_t offset, int origi if (offset > 0) return _Z_ERR_DID_NOT_READ; else - return _z_bytes_reader_seek_backward(reader, -offset); + return _z_bytes_reader_seek_backward(reader, (size_t)(-offset)); } default: return _Z_ERR_GENERIC; } } -int64_t _z_bytes_reader_tell(const _z_bytes_reader_t *reader) { return reader->byte_idx; } +int64_t _z_bytes_reader_tell(const _z_bytes_reader_t *reader) { return (int64_t)reader->byte_idx; } size_t _z_bytes_reader_read(_z_bytes_reader_t *reader, uint8_t *buf, size_t len) { uint8_t *buf_start = buf; diff --git a/src/collections/string.c b/src/collections/string.c index e2ac6cdd9..a805ed0e1 100644 --- a/src/collections/string.c +++ b/src/collections/string.c @@ -74,6 +74,16 @@ void _z_string_move(_z_string_t *dst, _z_string_t *src) { src->len = 0; } +_z_string_t _z_string_steal(_z_string_t *str) { + _z_string_t ret = { + .val = str->val, + .len = str->len, + }; + str->val = NULL; + str->len = 0; + return ret; +} + void _z_string_move_str(_z_string_t *dst, char *src) { dst->val = src; dst->len = strlen(src); diff --git a/src/net/encoding.c b/src/net/encoding.c index c4fd5e50b..e09243a01 100644 --- a/src/net/encoding.c +++ b/src/net/encoding.c @@ -57,3 +57,12 @@ void _z_encoding_move(_z_encoding_t *dst, _z_encoding_t *src) { src->id = _Z_ENCODING_ID_DEFAULT; _z_string_move(&dst->schema, &src->schema); } + +_z_encoding_t _z_encoding_steal(_z_encoding_t *val) { + _z_encoding_t ret = { + .id = val->id, + .schema = _z_string_steal(&val->schema), + }; + val->id = _Z_ENCODING_ID_DEFAULT; + return ret; +} diff --git a/src/net/query.c b/src/net/query.c index fb79133a7..893f7dd40 100644 --- a/src/net/query.c +++ b/src/net/query.c @@ -41,20 +41,37 @@ void _z_query_clear(_z_query_t *q) { _z_bytes_drop(&q->attachment); } +void _z_query_copy(_z_query_t *dst, const _z_query_t *src) { + dst->_anyke = src->_anyke; + dst->_key = _z_keyexpr_duplicate(src->_key); + dst->_parameters = src->_parameters; + dst->_request_id = src->_request_id; + dst->_zn = src->_zn; + _z_value_copy(&dst->_value, &src->_value); +} + +void _z_query_free(_z_query_t **query) { + _z_query_t *ptr = *query; + if (ptr != NULL) { + _z_query_clear(ptr); + z_free(ptr); + *query = NULL; + } +} + #if Z_FEATURE_QUERYABLE == 1 -_z_query_t _z_query_create(const _z_value_t *value, const _z_keyexpr_t *key, const _z_slice_t *parameters, - _z_session_t *zn, uint32_t request_id, const _z_bytes_t attachment) { +_z_query_t _z_query_create(const _z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_t *zn, + uint32_t request_id, const _z_bytes_t attachment) { _z_query_t q = _z_query_null(); q._request_id = request_id; - q._zn = zn; // Ideally would have been an rc + q._zn = zn; q._parameters = (char *)z_malloc(parameters->len + 1); - memcpy(q._parameters, parameters->start, parameters->len); + memcpy(q._parameters, parameters->start, parameters->len); // TODO: Might be movable, Issue #482 q._parameters[parameters->len] = 0; q._anyke = (strstr(q._parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true; + q._key = _z_keyexpr_steal(key); _z_bytes_copy(&q.attachment, &attachment); - - _z_keyexpr_copy(&q._key, key); - _z_value_copy(&q._value, value); + _z_value_copy(&q._value, value); // FIXME: Move encoding, Issue #482 return q; } diff --git a/src/net/reply.c b/src/net/reply.c index 43de427ba..06d63128e 100644 --- a/src/net/reply.c +++ b/src/net/reply.c @@ -21,14 +21,14 @@ _z_reply_t _z_reply_null(void) { _z_reply_t r = {._tag = Z_REPLY_TAG_DATA, .data = { .replier_id = {.id = {0}}, - .sample = {.in = NULL}, + .sample = _z_sample_null(), }}; return r; } #if Z_FEATURE_QUERY == 1 void _z_reply_data_clear(_z_reply_data_t *reply_data) { - _z_sample_rc_drop(&reply_data->sample); + _z_sample_clear(&reply_data->sample); reply_data->replier_id = _z_id_empty(); } @@ -37,15 +37,14 @@ void _z_reply_data_free(_z_reply_data_t **reply_data) { if (ptr != NULL) { _z_reply_data_clear(ptr); - z_free(ptr); *reply_data = NULL; } } -void _z_reply_data_copy(_z_reply_data_t *dst, _z_reply_data_t *src) { - _z_sample_rc_copy(&dst->sample, &src->sample); +void _z_reply_data_copy(_z_reply_data_t *dst, const _z_reply_data_t *src) { dst->replier_id = src->replier_id; + _z_sample_copy(&dst->sample, &src->sample); } _z_reply_t _z_reply_move(_z_reply_t *src_reply) { @@ -67,7 +66,7 @@ void _z_reply_free(_z_reply_t **reply) { } } -void _z_reply_copy(_z_reply_t *dst, _z_reply_t *src) { +void _z_reply_copy(_z_reply_t *dst, const _z_reply_t *src) { _z_reply_data_copy(&dst->data, &src->data); dst->_tag = src->_tag; } @@ -91,17 +90,13 @@ _z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, reply._tag = tag; if (tag == Z_REPLY_TAG_DATA) { reply.data.replier_id = id; - // Create sample - _z_sample_t sample = _z_sample_null(); - sample.keyexpr = keyexpr; // FIXME: call z_keyexpr_move or copy - sample.encoding = encoding; // FIXME: call z_encoding_move or copy - _z_bytes_copy(&sample.payload, &payload); - sample.kind = kind; - sample.timestamp = _z_timestamp_duplicate(timestamp); - _z_bytes_copy(&sample.attachment, &attachment); - - // Create sample rc from value - reply.data.sample = _z_sample_rc_new_from_val(sample); + // Create reply sample + reply.data.sample.keyexpr = _z_keyexpr_steal(&keyexpr); + reply.data.sample.kind = kind; + reply.data.sample.timestamp = _z_timestamp_duplicate(timestamp); + _z_bytes_copy(&reply.data.sample.payload, &payload); + _z_bytes_copy(&reply.data.sample.attachment, &attachment); + _z_encoding_copy(&reply.data.sample.encoding, &encoding); // FIXME: Move encoding, Issue #482 } return reply; } diff --git a/src/net/sample.c b/src/net/sample.c index 373771876..6c27e64f8 100644 --- a/src/net/sample.c +++ b/src/net/sample.c @@ -78,22 +78,22 @@ _z_sample_t _z_sample_duplicate(const _z_sample_t *src) { } #if Z_FEATURE_SUBSCRIPTION == 1 -_z_sample_t _z_sample_create(const _z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t timestamp, - const _z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos, +_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t timestamp, + _z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos, const _z_bytes_t attachment) { _z_sample_t s = _z_sample_null(); - _z_keyexpr_copy(&s.keyexpr, key); - _z_bytes_copy(&s.payload, &payload); - _z_encoding_copy(&s.encoding, &encoding); + s.keyexpr = _z_keyexpr_steal(key); s.kind = kind; s.timestamp = timestamp; s.qos = qos; + _z_bytes_copy(&s.payload, &payload); _z_bytes_copy(&s.attachment, &attachment); + _z_encoding_copy(&s.encoding, &encoding); // FIXME: Move encoding, Issue #482 return s; } #else -_z_sample_t _z_sample_create(const _z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t timestamp, - const _z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos, +_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const _z_timestamp_t timestamp, + _z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos, const _z_bytes_t attachment) { _ZP_UNUSED(key); _ZP_UNUSED(payload); diff --git a/src/protocol/definitions/message.c b/src/protocol/definitions/message.c index e9e5fe0af..d83f6c4c0 100644 --- a/src/protocol/definitions/message.c +++ b/src/protocol/definitions/message.c @@ -23,8 +23,9 @@ void _z_msg_reply_clear(_z_msg_reply_t *msg) { _z_push_body_clear(&msg->_body); } void _z_msg_put_clear(_z_msg_put_t *msg) { - _z_encoding_clear(&msg->_encoding); _z_bytes_drop(&msg->_payload); + _z_bytes_drop(&msg->_attachment); + _z_encoding_clear(&msg->_encoding); // FIXME: Remove when possible, Issue #482 _z_timestamp_clear(&msg->_commons._timestamp); } @@ -38,6 +39,7 @@ _z_msg_query_reqexts_t _z_msg_query_required_extensions(const _z_msg_query_t *ms void _z_msg_query_clear(_z_msg_query_t *msg) { _z_slice_clear(&msg->_parameters); + _z_bytes_drop(&msg->_ext_attachment); _z_value_clear(&msg->_ext_value); } void _z_msg_err_clear(_z_msg_err_t *err) { diff --git a/src/protocol/keyexpr.c b/src/protocol/keyexpr.c index ff2c38722..1d6fd6634 100644 --- a/src/protocol/keyexpr.c +++ b/src/protocol/keyexpr.c @@ -39,7 +39,6 @@ int8_t _z_keyexpr_copy(_z_keyexpr_t *dst, const _z_keyexpr_t *src) { return _Z_ERR_SYSTEM_OUT_OF_MEMORY; } dst->_id = src->_id; - dst->_suffix = src->_suffix ? _z_str_clone(src->_suffix) : NULL; dst->_mapping = src->_mapping; _z_keyexpr_set_owns_suffix(dst, true); return _Z_RES_OK; diff --git a/src/session/query.c b/src/session/query.c index c31f10f2f..71ceb5b64 100644 --- a/src/session/query.c +++ b/src/session/query.c @@ -127,8 +127,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons pen_rep = _z_pending_reply_list_head(pen_rps); // Check if this is the same resource key - if (_z_str_eq(pen_rep->_reply.data.sample.in->val.keyexpr._suffix, - reply.data.sample.in->val.keyexpr._suffix) == true) { + if (_z_str_eq(pen_rep->_reply.data.sample.keyexpr._suffix, reply.data.sample.keyexpr._suffix) == true) { if (msg->_commons._timestamp.time <= pen_rep->_tstamp.time) { drop = true; } else { @@ -149,7 +148,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons _z_reply_t partial_reply; (void)memset(&partial_reply, 0, sizeof(_z_reply_t)); // Avoid warnings on uninitialized values on the reply - partial_reply.data.sample.in->val.keyexpr = _z_keyexpr_duplicate(reply.data.sample.in->val.keyexpr); + partial_reply.data.sample.keyexpr = _z_keyexpr_duplicate(reply.data.sample.keyexpr); pen_rep->_reply = partial_reply; } else { pen_rep->_reply = reply; // Store the whole reply in the latest mode @@ -166,10 +165,10 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons // Trigger the user callback if ((ret == _Z_RES_OK) && (pen_qry->_consolidation != Z_CONSOLIDATION_MODE_LATEST)) { - _z_reply_rc_t cb_reply = _z_reply_rc_new(); - cb_reply.in->val = _z_reply_move(&reply); + _z_reply_t cb_reply = _z_reply_null(); + cb_reply = _z_reply_move(&reply); pen_qry->_callback(&cb_reply, pen_qry->_arg); - _z_reply_rc_drop(&cb_reply); + _z_reply_clear(&cb_reply); } if (ret != _Z_RES_OK) { @@ -189,18 +188,17 @@ int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id) { if ((ret == _Z_RES_OK) && (pen_qry == NULL)) { ret = _Z_ERR_ENTITY_UNKNOWN; } - // The reply is the final one, apply consolidation if needed if ((ret == _Z_RES_OK) && (pen_qry->_consolidation == Z_CONSOLIDATION_MODE_LATEST)) { while (pen_qry->_pending_replies != NULL) { _z_pending_reply_t *pen_rep = _z_pending_reply_list_head(pen_qry->_pending_replies); // Trigger the query handler - _z_reply_rc_t cb_reply = _z_reply_rc_new(); - cb_reply.in->val = _z_reply_move(&pen_rep->_reply); + _z_reply_t cb_reply = _z_reply_null(); + cb_reply = _z_reply_move(&pen_rep->_reply); pen_qry->_callback(&cb_reply, pen_qry->_arg); pen_qry->_pending_replies = _z_pending_reply_list_pop(pen_qry->_pending_replies, NULL); - _z_reply_rc_drop(&cb_reply); + _z_reply_clear(&cb_reply); } } diff --git a/src/session/queryable.c b/src/session/queryable.c index 53d9747ca..9ca478151 100644 --- a/src/session/queryable.c +++ b/src/session/queryable.c @@ -156,7 +156,6 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const } // Clean up _z_query_rc_drop(&query); - _z_keyexpr_clear(&key); _z_session_queryable_rc_list_free(&qles); } else { _zp_session_unlock_mutex(zn); diff --git a/src/session/subscription.c b/src/session/subscription.c index b146cdaa4..7a55a166c 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -162,8 +162,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co _zp_session_unlock_mutex(zn); // Build the sample - _z_sample_rc_t sample = _z_sample_rc_new(); - sample.in->val = _z_sample_create(&key, payload, timestamp, encoding, kind, qos, attachment); + _z_sample_t sample = _z_sample_create(&key, payload, timestamp, encoding, kind, qos, attachment); // Parse subscription list _z_subscription_rc_list_t *xs = subs; _Z_DEBUG("Triggering %ju subs", (uintmax_t)_z_subscription_rc_list_len(xs)); @@ -173,8 +172,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co xs = _z_subscription_rc_list_tail(xs); } // Clean up - _z_sample_rc_drop(&sample); - _z_keyexpr_clear(&key); + _z_sample_clear(&sample); _z_subscription_rc_list_free(&subs); } else { _zp_session_unlock_mutex(zn); diff --git a/tests/attachment.py b/tests/attachment.py index 8624939ce..3b1888c29 100644 --- a/tests/attachment.py +++ b/tests/attachment.py @@ -30,28 +30,28 @@ def pub_and_sub(): Declaring Subscriber on 'demo/example/**'... Press CTRL-C to quit... >> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 0] Pub from Pico!') - with encoding: text/plain;utf8 - with attachment: + with encoding: zenoh/string;utf8 + with attachment: 0: source, C 1: index, 0 >> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 1] Pub from Pico!') - with encoding: text/plain;utf8 - with attachment: + with encoding: zenoh/string;utf8 + with attachment: 0: source, C 1: index, 1 >> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 2] Pub from Pico!') - with encoding: text/plain;utf8 - with attachment: + with encoding: zenoh/string;utf8 + with attachment: 0: source, C 1: index, 2 >> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 3] Pub from Pico!') - with encoding: text/plain;utf8 - with attachment: + with encoding: zenoh/string;utf8 + with attachment: 0: source, C 1: index, 3 >> [Subscriber] Received ('demo/example/zenoh-pico-pub': '[ 4] Pub from Pico!') - with encoding: text/plain;utf8 - with attachment: + with encoding: zenoh/string;utf8 + with attachment: 0: source, C 1: index, 4''' @@ -137,7 +137,8 @@ def query_and_queryable(): z_query_expected_output = """Opening session... Sending Query 'demo/example/**'... >> Received ('demo/example/**': 'Queryable from Pico!') - with attachment: + with encoding: zenoh/string;utf8 + with attachment: 0: reply_key, reply_value >> Received query final notification""" @@ -147,7 +148,8 @@ def query_and_queryable(): Creating Queryable on 'demo/example/zenoh-pico-queryable'... Press CTRL-C to quit... >> [Queryable handler] Received Query 'demo/example/**' - with attachment: + with encoding: zenoh/string;utf8 + with attachment: 0: test_key, test_value""" print("Start queryable") diff --git a/tests/z_channels_test.c b/tests/z_channels_test.c index 84beac90d..173db7e1d 100644 --- a/tests/z_channels_test.c +++ b/tests/z_channels_test.c @@ -27,13 +27,15 @@ do { \ _z_bytes_t payload; \ _z_bytes_from_slice(&payload, (_z_slice_t){.start = (const uint8_t *)v, .len = strlen(v)}); \ - _z_sample_t s = {.keyexpr = _z_rname("key"), \ - .payload = payload, \ - .timestamp = _z_timestamp_null(), \ - .encoding = _z_encoding_null(), \ - .kind = 0, \ - .qos = {0}}; \ - z_loaned_sample_t sample = _z_sample_rc_new_from_val(s); \ + z_loaned_sample_t sample = { \ + .keyexpr = _z_rname("key"), \ + .payload = payload, \ + .timestamp = _z_timestamp_null(), \ + .encoding = _z_encoding_null(), \ + .kind = 0, \ + .qos = {0}, \ + .attachment = _z_bytes_null(), \ + }; \ z_call(*z_loan(closure), &sample); \ } while (0);