Skip to content

Commit

Permalink
Remove refcounting from reply/sample. (#440)
Browse files Browse the repository at this point in the history
* feat: create reply sample union type

* feat: remove rc from sample/query/reply

* fix: -Wconversion errors

* fix: strncat jump condition based on uninitialized data

* fix: move attachments data

* feat: add encoding to attachment examples

* feat: add _z_string_steal and _z_encoding_steal

* fix: remove const from encoding as function arg

* feat: copy payload in z_write and steal encoding

* fix: attachment double drop issue

* fix: update attachment test

* fix: decode attachment memory leak

* fix: copy attachment instead of move

* fix: revert previous z_write changes

* chore: format code

* fix: revert const attachment changes

* feat: re-add rc to queries

* fix: double suffix allocation

* feat: use elem_clone to avoid duplicating channel code

* fix: remove double sample create init
  • Loading branch information
jean-roland authored Jul 2, 2024
1 parent 0b0e0d1 commit 27fb58e
Show file tree
Hide file tree
Showing 27 changed files with 175 additions and 125 deletions.
12 changes: 11 additions & 1 deletion examples/unix/c11/z_get_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,13 @@ void parse_attachment(kv_pairs_rx_t *kvp, const z_loaned_bytes_t *attachment) {
z_bytes_deserialize_into_string(z_loan(second), &kvp->data[kvp->current_idx].value);
z_bytes_drop(&first);
z_bytes_drop(&second);
z_bytes_drop(&kv);
kvp->current_idx++;
}
}

void print_attachment(kv_pairs_rx_t *kvp) {
printf(" with attachment:\n");
printf(" with attachment:\n");
for (uint32_t i = 0; i < kvp->current_idx; i++) {
printf(" %d: %s, %s\n", i, z_string_data(z_loan(kvp->data[i].key)),
z_string_data(z_loan(kvp->data[i].value)));
Expand Down Expand Up @@ -105,8 +106,11 @@ void reply_handler(const z_loaned_reply_t *reply, void *ctx) {
z_keyexpr_to_string(z_sample_keyexpr(sample), &keystr);
z_owned_string_t replystr;
z_bytes_deserialize_into_string(z_sample_payload(sample), &replystr);
z_owned_string_t encoding;
z_encoding_to_string(z_sample_encoding(sample), &encoding);

printf(">> Received ('%s': '%s')\n", z_string_data(z_loan(keystr)), z_string_data(z_loan(replystr)));
printf(" with encoding: %s\n", z_string_data(z_loan(encoding)));

// Check attachment
kv_pairs_rx_t kvp = {
Expand All @@ -119,6 +123,7 @@ void reply_handler(const z_loaned_reply_t *reply, void *ctx) {

z_drop(z_move(keystr));
z_drop(z_move(replystr));
z_drop(z_move(encoding));
} else {
printf(">> Received an error\n");
}
Expand Down Expand Up @@ -213,6 +218,11 @@ int main(int argc, char **argv) {
z_bytes_serialize_from_iter(&attachment, create_attachment_iter, (void *)&ctx);
opts.attachment = z_move(attachment);

// Add encoding value
z_owned_encoding_t encoding;
z_encoding_from_str(&encoding, "zenoh/string;utf8");
opts.encoding = z_move(encoding);

z_owned_closure_reply_t callback;
z_closure(&callback, reply_handler, reply_dropper);
if (z_get(z_loan(s), z_loan(ke), "", z_move(callback), &opts) < 0) {
Expand Down
2 changes: 1 addition & 1 deletion examples/unix/c11/z_pub_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ int main(int argc, char **argv) {
options.attachment = z_move(attachment);

// Add encoding value
z_encoding_from_str(&encoding, "text/plain;utf8");
z_encoding_from_str(&encoding, "zenoh/string;utf8");
options.encoding = z_move(encoding);

z_publisher_put(z_loan(pub), z_move(payload), &options);
Expand Down
24 changes: 18 additions & 6 deletions examples/unix/c11/z_queryable_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,13 @@ void parse_attachment(kv_pairs_rx_t *kvp, const z_loaned_bytes_t *attachment) {
z_bytes_deserialize_into_string(z_loan(second), &kvp->data[kvp->current_idx].value);
z_bytes_drop(&first);
z_bytes_drop(&second);
z_bytes_drop(&kv);
kvp->current_idx++;
}
}

void print_attachment(kv_pairs_rx_t *kvp) {
printf(" with attachment:\n");
printf(" with attachment:\n");
for (uint32_t i = 0; i < kvp->current_idx; i++) {
printf(" %d: %s, %s\n", i, z_string_data(z_loan(kvp->data[i].key)),
z_string_data(z_loan(kvp->data[i].value)));
Expand All @@ -110,11 +111,16 @@ void query_handler(const z_loaned_query_t *query, void *ctx) {
z_query_parameters(query, &params);
printf(" >> [Queryable handler] Received Query '%s%.*s'\n", z_string_data(z_loan(keystr)), (int)z_loan(params)->len,
z_loan(params)->val);
// Process encoding
z_owned_string_t encoding;
z_encoding_to_string(z_query_encoding(query), &encoding);
printf(" with encoding: %s\n", z_string_data(z_loan(encoding)));

// Process value
z_owned_string_t payload_string;
z_bytes_deserialize_into_string(z_query_payload(query), &payload_string);
if (z_string_len(z_loan(payload_string)) > 1) {
printf(" with value '%s'\n", z_string_data(z_loan(payload_string)));
printf(" with value '%s'\n", z_string_data(z_loan(payload_string)));
}
// Check attachment
kv_pairs_rx_t kvp = {
Expand All @@ -125,22 +131,28 @@ void query_handler(const z_loaned_query_t *query, void *ctx) {
}
drop_attachment(&kvp);
z_drop(z_move(payload_string));
z_drop(z_move(encoding));

// Reply value encoding
// Reply payload
z_owned_bytes_t reply_payload;
z_bytes_serialize_from_str(&reply_payload, value);

z_query_reply_options_t options;
z_query_reply_options_default(&options);

// Reply attachment
kv_pair_t kvs[1];
kvs[0] = (kv_pair_t){.key = "reply_key", .value = "reply_value"};
kv_pairs_tx_t kv_ctx = (kv_pairs_tx_t){.data = kvs, .current_idx = 0, .len = 1};
z_owned_bytes_t attachment;
z_bytes_serialize_from_iter(&attachment, create_attachment_iter, (void *)&kv_ctx);

z_query_reply_options_t options;
z_query_reply_options_default(&options);
options.attachment = z_move(attachment);

// Reply encoding
z_owned_encoding_t reply_encoding;
z_encoding_from_str(&reply_encoding, "zenoh/string;utf8");
options.encoding = z_move(reply_encoding);

z_query_reply(query, z_query_keyexpr(query), z_move(reply_payload), &options);
z_drop(z_move(keystr));
msg_nb++;
Expand Down
2 changes: 1 addition & 1 deletion examples/unix/c11/z_sub_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void parse_attachment(kv_pairs_t *kvp, const z_loaned_bytes_t *attachment) {
}

void print_attachment(kv_pairs_t *kvp) {
printf(" with attachment:\n");
printf(" with attachment:\n");
for (uint32_t i = 0; i < kvp->current_idx; i++) {
printf(" %d: %s, %s\n", i, z_string_data(z_loan(kvp->data[i].key)),
z_string_data(z_loan(kvp->data[i].value)));
Expand Down
11 changes: 4 additions & 7 deletions include/zenoh-pico/api/handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#define _Z_CHANNEL_DEFINE_IMPL(handler_type, handler_name, handler_new_f_name, callback_type, callback_new_f, \
collection_type, collection_new_f, collection_free_f, collection_push_f, \
collection_pull_f, collection_try_pull_f, elem_owned_type, elem_loaned_type, \
elem_copy_f, elem_drop_f) \
elem_clone_f, elem_drop_f) \
typedef struct { \
collection_type *collection; \
} handler_type; \
Expand All @@ -50,11 +50,8 @@
_Z_ERROR("Out of memory"); \
return; \
} \
if (elem == NULL) { \
internal_elem->_rc.in = NULL; \
} else { \
elem_copy_f(&internal_elem->_rc, elem); \
} \
elem_clone_f(internal_elem, elem); \
\
int8_t ret = collection_push_f(internal_elem, context, _z_##handler_name##_elem_free); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_push_f, ret); \
Expand Down Expand Up @@ -112,7 +109,7 @@
/* collection_try_pull_f */ _z_##kind_name##_mt_try_pull, \
/* elem_owned_type */ z_owned_##item_name##_t, \
/* elem_loaned_type */ z_loaned_##item_name##_t, \
/* elem_copy_f */ _z_##item_name##_rc_copy, \
/* elem_clone_f */ z_##item_name##_clone, \
/* elem_drop_f */ z_##item_name##_drop)

#define _Z_CHANNEL_DEFINE_DUMMY(item_name, kind_name) \
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,7 @@ z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample);
z_qos_t z_sample_qos(const z_loaned_sample_t *sample);

/**
* Gets the attachment of a value by aliasing it.
* Gets the attachment of a sample by aliasing it.
*
* Parameters:
* sample: Pointer to a :c:type:`z_loaned_sample_t` to get the attachment from.
Expand Down
8 changes: 4 additions & 4 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,8 @@ static inline z_qos_t z_qos_default(void) { return _Z_N_QOS_DEFAULT; }
* z_timestamp_t timestamp: The timestamp of this data sample.
* z_qos_t qos: Quality of service settings used to deliver this sample.
*/
_Z_OWNED_TYPE_RC(_z_sample_rc_t, sample)
_Z_LOANED_TYPE(_z_sample_rc_t, sample)
_Z_OWNED_TYPE_PTR(_z_sample_t, sample)
_Z_LOANED_TYPE(_z_sample_t, sample)

/**
* Represents the content of a `hello` message returned by a zenoh entity as a reply to a `scout` message.
Expand All @@ -420,8 +420,8 @@ _Z_LOANED_TYPE(_z_hello_t, hello)
/**
* Represents the reply to a query.
*/
_Z_OWNED_TYPE_RC(_z_reply_rc_t, reply)
_Z_LOANED_TYPE(_z_reply_rc_t, reply)
_Z_OWNED_TYPE_PTR(_z_reply_t, reply)
_Z_LOANED_TYPE(_z_reply_t, reply)

/**
* Represents an array of non null-terminated string.
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/collections/string.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ _z_string_t *_z_string_make_as_ptr(const char *value);
size_t _z_string_size(const _z_string_t *s);
int8_t _z_string_copy(_z_string_t *dst, const _z_string_t *src);
void _z_string_move(_z_string_t *dst, _z_string_t *src);
_z_string_t _z_string_steal(_z_string_t *str);
void _z_string_move_str(_z_string_t *dst, char *src);
void _z_string_clear(_z_string_t *s);
void _z_string_free(_z_string_t **s);
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/net/encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ void _z_encoding_clear(_z_encoding_t *encoding);
_Bool _z_encoding_check(const _z_encoding_t *encoding);
int8_t _z_encoding_copy(_z_encoding_t *dst, const _z_encoding_t *src);
void _z_encoding_move(_z_encoding_t *dst, _z_encoding_t *src);
_z_encoding_t _z_encoding_steal(_z_encoding_t *val);

#endif /* ZENOH_PICO_ENCODING_NETAPI_H */
9 changes: 6 additions & 3 deletions include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ typedef struct _z_query_t {
_z_value_t _value;
_z_keyexpr_t _key;
uint32_t _request_id;
_z_session_t *_zn;
_z_session_t *_zn; // FIXME: Potential UB source, Issue #476
_z_bytes_t attachment;
char *_parameters;
_Bool _anyke;
} _z_query_t;

_z_query_t _z_query_null(void);
void _z_query_clear(_z_query_t *q);
void _z_query_copy(_z_query_t *dst, const _z_query_t *src);
void _z_query_free(_z_query_t **query);

_Z_REFCOUNT_DEFINE(_z_query, _z_query)

/**
Expand All @@ -47,8 +50,8 @@ typedef struct {
} _z_queryable_t;

#if Z_FEATURE_QUERYABLE == 1
_z_query_t _z_query_create(const _z_value_t *value, const _z_keyexpr_t *key, const _z_slice_t *parameters,
_z_session_t *zn, uint32_t request_id, const _z_bytes_t attachment);
_z_query_t _z_query_create(const _z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_t *zn,
uint32_t request_id, const _z_bytes_t attachment);
void _z_queryable_clear(_z_queryable_t *qbl);
void _z_queryable_free(_z_queryable_t **qbl);
#endif
Expand Down
8 changes: 3 additions & 5 deletions include/zenoh-pico/net/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
*
*/
typedef struct _z_reply_data_t {
_z_sample_rc_t sample;
_z_sample_t sample;
_z_id_t replier_id;
} _z_reply_data_t;

void _z_reply_data_clear(_z_reply_data_t *rd);
void _z_reply_data_copy(_z_reply_data_t *dst, _z_reply_data_t *src);
void _z_reply_data_copy(_z_reply_data_t *dst, const _z_reply_data_t *src);
_z_reply_t _z_reply_move(_z_reply_t *src_reply);

_Z_ELEM_DEFINE(_z_reply_data, _z_reply_data_t, _z_noop_size, _z_reply_data_clear, _z_noop_copy)
Expand All @@ -60,13 +60,11 @@ typedef struct _z_reply_t {
_z_reply_t _z_reply_null(void);
void _z_reply_clear(_z_reply_t *src);
void _z_reply_free(_z_reply_t **hello);
void _z_reply_copy(_z_reply_t *dst, _z_reply_t *src);
void _z_reply_copy(_z_reply_t *dst, const _z_reply_t *src);
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
const _z_timestamp_t *timestamp, _z_encoding_t encoding, z_sample_kind_t kind,
const _z_bytes_t attachment);

_Z_REFCOUNT_DEFINE(_z_reply, _z_reply)

typedef struct _z_pending_reply_t {
_z_reply_t _reply;
_z_timestamp_t _tstamp;
Expand Down
6 changes: 2 additions & 4 deletions include/zenoh-pico/net/sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ typedef struct _z_sample_t {
} _z_sample_t;
void _z_sample_clear(_z_sample_t *sample);

_Z_REFCOUNT_DEFINE(_z_sample, _z_sample)

_z_sample_t _z_sample_null(void);
_Bool _z_sample_check(const _z_sample_t *sample);
void _z_sample_move(_z_sample_t *dst, _z_sample_t *src);
Expand All @@ -56,8 +54,8 @@ void _z_sample_free(_z_sample_t **sample);
void _z_sample_copy(_z_sample_t *dst, const _z_sample_t *src);
_z_sample_t _z_sample_duplicate(const _z_sample_t *src);

_z_sample_t _z_sample_create(const _z_keyexpr_t *key, const _z_bytes_t payload, _z_timestamp_t timestamp,
const _z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos,
_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, _z_timestamp_t timestamp,
_z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos,
const _z_bytes_t attachment);

#endif /* ZENOH_PICO_SAMPLE_NETAPI_H */
14 changes: 7 additions & 7 deletions include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ _Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_noop_size, _z_resource_clear, _z_r
_Z_LIST_DEFINE(_z_resource, _z_resource_t)

// Forward declaration to avoid cyclical include
typedef struct _z_sample_rc_t z_loaned_sample_t;
typedef struct _z_sample_t _z_sample_t;

/**
* The callback signature of the functions handling data messages.
*/
typedef void (*_z_data_handler_t)(const z_loaned_sample_t *sample, void *arg);
typedef void (*_z_data_handler_t)(const _z_sample_t *sample, void *arg);

typedef struct {
_z_keyexpr_t _key;
Expand All @@ -81,12 +81,12 @@ typedef struct {
} _z_publication_t;

// Forward type declaration to avoid cyclical include
typedef struct _z_query_rc_t z_loaned_query_t;
typedef struct _z_query_rc_t _z_query_rc_t;

/**
* The callback signature of the functions handling query messages.
*/
typedef void (*_z_queryable_handler_t)(const z_loaned_query_t *query, void *arg);
typedef void (*_z_queryable_handler_t)(const _z_query_rc_t *query, void *arg);

typedef struct {
_z_keyexpr_t _key;
Expand All @@ -110,12 +110,12 @@ _Z_LIST_DEFINE(_z_session_queryable_rc, _z_session_queryable_rc_t)
typedef struct _z_reply_t _z_reply_t;
typedef _z_list_t _z_reply_data_list_t;
typedef _z_list_t _z_pending_reply_list_t;
typedef struct _z_reply_rc_t _z_reply_rc_t;
typedef _z_reply_rc_t z_loaned_reply_t;
typedef struct _z_reply_t _z_reply_t;

/**
* The callback signature of the functions handling query replies.
*/
typedef void (*_z_reply_handler_t)(const z_loaned_reply_t *reply, void *arg);
typedef void (*_z_reply_handler_t)(const _z_reply_t *reply, void *arg);

typedef struct {
_z_keyexpr_t _key;
Expand Down
25 changes: 12 additions & 13 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ static int8_t _z_encoding_convert_into_string(const z_loaned_encoding_t *encodin
}
// Allocate string
char *value = (char *)z_malloc(sizeof(char) * total_len);
memset(value, 0, total_len);
if (value == NULL) {
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
Expand Down Expand Up @@ -666,7 +667,7 @@ static _z_encoding_t _z_encoding_from_owned(const z_owned_encoding_t *encoding)
}
#endif

_Z_OWNED_FUNCTIONS_RC_IMPL(sample)
_Z_OWNED_FUNCTIONS_PTR_IMPL(_z_sample_t, sample, _z_sample_copy, _z_sample_free)
_Z_OWNED_FUNCTIONS_RC_IMPL(session)

_Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_sample, _z_data_handler_t, z_dropper_handler_t)
Expand Down Expand Up @@ -809,15 +810,13 @@ int8_t z_info_routers_zid(const z_loaned_session_t *zs, z_owned_closure_zid_t *c

z_id_t z_info_zid(const z_loaned_session_t *zs) { return _Z_RC_IN_VAL(zs)._local_zid; }

const z_loaned_keyexpr_t *z_sample_keyexpr(const z_loaned_sample_t *sample) { return &_Z_RC_IN_VAL(sample).keyexpr; }
z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample) { return _Z_RC_IN_VAL(sample).kind; }
const z_loaned_bytes_t *z_sample_payload(const z_loaned_sample_t *sample) { return &_Z_RC_IN_VAL(sample).payload; }
z_timestamp_t z_sample_timestamp(const z_loaned_sample_t *sample) { return _Z_RC_IN_VAL(sample).timestamp; }
const z_loaned_encoding_t *z_sample_encoding(const z_loaned_sample_t *sample) { return &_Z_RC_IN_VAL(sample).encoding; }
z_qos_t z_sample_qos(const z_loaned_sample_t *sample) { return _Z_RC_IN_VAL(sample).qos; }
const z_loaned_bytes_t *z_sample_attachment(const z_loaned_sample_t *sample) {
return &_Z_RC_IN_VAL(sample).attachment;
}
const z_loaned_keyexpr_t *z_sample_keyexpr(const z_loaned_sample_t *sample) { return &sample->keyexpr; }
z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample) { return sample->kind; }
const z_loaned_bytes_t *z_sample_payload(const z_loaned_sample_t *sample) { return &sample->payload; }
z_timestamp_t z_sample_timestamp(const z_loaned_sample_t *sample) { return sample->timestamp; }
const z_loaned_encoding_t *z_sample_encoding(const z_loaned_sample_t *sample) { return &sample->encoding; }
z_qos_t z_sample_qos(const z_loaned_sample_t *sample) { return sample->qos; }
const z_loaned_bytes_t *z_sample_attachment(const z_loaned_sample_t *sample) { return &sample->attachment; }

const z_loaned_bytes_t *z_reply_err_payload(const z_loaned_reply_err_t *reply_err) { return &reply_err->payload; }
const z_loaned_encoding_t *z_reply_err_encoding(const z_loaned_reply_err_t *reply_err) { return &reply_err->encoding; }
Expand Down Expand Up @@ -989,7 +988,7 @@ z_owned_keyexpr_t z_publisher_keyexpr(z_loaned_publisher_t *publisher) {
#endif

#if Z_FEATURE_QUERY == 1
_Z_OWNED_FUNCTIONS_RC_IMPL(reply)
_Z_OWNED_FUNCTIONS_PTR_IMPL(_z_reply_t, reply, _z_reply_copy, _z_reply_free)

void z_get_options_default(z_get_options_t *options) {
options->target = z_query_target_default();
Expand Down Expand Up @@ -1045,11 +1044,11 @@ int8_t z_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, co
_Bool z_reply_is_ok(const z_loaned_reply_t *reply) {
_ZP_UNUSED(reply);
// For the moment always return TRUE.
// The support for reply errors will come in the next release.
// FIXME: The support for reply errors will come in the next release.
return true;
}

const z_loaned_sample_t *z_reply_ok(const z_loaned_reply_t *reply) { return &reply->in->val.data.sample; }
const z_loaned_sample_t *z_reply_ok(const z_loaned_reply_t *reply) { return &reply->data.sample; }

const z_loaned_reply_err_t *z_reply_err(const z_loaned_reply_t *reply) {
_ZP_UNUSED(reply);
Expand Down
Loading

0 comments on commit 27fb58e

Please sign in to comment.