Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove refcounting from reply/sample. #440

Merged
merged 20 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion examples/unix/c11/z_get_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,13 @@ void parse_attachment(kv_pairs_rx_t *kvp, const z_loaned_bytes_t *attachment) {
z_bytes_deserialize_into_string(z_loan(second), &kvp->data[kvp->current_idx].value);
z_bytes_drop(&first);
z_bytes_drop(&second);
z_bytes_drop(&kv);
kvp->current_idx++;
}
}

void print_attachment(kv_pairs_rx_t *kvp) {
printf(" with attachment:\n");
printf(" with attachment:\n");
for (uint32_t i = 0; i < kvp->current_idx; i++) {
printf(" %d: %s, %s\n", i, z_string_data(z_loan(kvp->data[i].key)),
z_string_data(z_loan(kvp->data[i].value)));
Expand Down Expand Up @@ -105,8 +106,11 @@ void reply_handler(const z_loaned_reply_t *reply, void *ctx) {
z_keyexpr_to_string(z_sample_keyexpr(sample), &keystr);
z_owned_string_t replystr;
z_bytes_deserialize_into_string(z_sample_payload(sample), &replystr);
z_owned_string_t encoding;
z_encoding_to_string(z_sample_encoding(sample), &encoding);

printf(">> Received ('%s': '%s')\n", z_string_data(z_loan(keystr)), z_string_data(z_loan(replystr)));
printf(" with encoding: %s\n", z_string_data(z_loan(encoding)));

// Check attachment
kv_pairs_rx_t kvp = {
Expand All @@ -119,6 +123,7 @@ void reply_handler(const z_loaned_reply_t *reply, void *ctx) {

z_drop(z_move(keystr));
z_drop(z_move(replystr));
z_drop(z_move(encoding));
} else {
printf(">> Received an error\n");
}
Expand Down Expand Up @@ -213,6 +218,11 @@ int main(int argc, char **argv) {
z_bytes_serialize_from_iter(&attachment, create_attachment_iter, (void *)&ctx);
opts.attachment = z_move(attachment);

// Add encoding value
z_owned_encoding_t encoding;
z_encoding_from_str(&encoding, "zenoh/string;utf8");
opts.encoding = z_move(encoding);

z_owned_closure_reply_t callback;
z_closure(&callback, reply_handler, reply_dropper);
if (z_get(z_loan(s), z_loan(ke), "", z_move(callback), &opts) < 0) {
Expand Down
2 changes: 1 addition & 1 deletion examples/unix/c11/z_pub_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ int main(int argc, char **argv) {
options.attachment = z_move(attachment);

// Add encoding value
z_encoding_from_str(&encoding, "text/plain;utf8");
z_encoding_from_str(&encoding, "zenoh/string;utf8");
options.encoding = z_move(encoding);

z_publisher_put(z_loan(pub), z_move(payload), &options);
Expand Down
24 changes: 18 additions & 6 deletions examples/unix/c11/z_queryable_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,13 @@ void parse_attachment(kv_pairs_rx_t *kvp, const z_loaned_bytes_t *attachment) {
z_bytes_deserialize_into_string(z_loan(second), &kvp->data[kvp->current_idx].value);
z_bytes_drop(&first);
z_bytes_drop(&second);
z_bytes_drop(&kv);
kvp->current_idx++;
}
}

void print_attachment(kv_pairs_rx_t *kvp) {
printf(" with attachment:\n");
printf(" with attachment:\n");
for (uint32_t i = 0; i < kvp->current_idx; i++) {
printf(" %d: %s, %s\n", i, z_string_data(z_loan(kvp->data[i].key)),
z_string_data(z_loan(kvp->data[i].value)));
Expand All @@ -110,11 +111,16 @@ void query_handler(const z_loaned_query_t *query, void *ctx) {
z_query_parameters(query, &params);
printf(" >> [Queryable handler] Received Query '%s%.*s'\n", z_string_data(z_loan(keystr)), (int)z_loan(params)->len,
z_loan(params)->val);
// Process encoding
z_owned_string_t encoding;
z_encoding_to_string(z_query_encoding(query), &encoding);
printf(" with encoding: %s\n", z_string_data(z_loan(encoding)));

// Process value
z_owned_string_t payload_string;
z_bytes_deserialize_into_string(z_query_payload(query), &payload_string);
if (z_string_len(z_loan(payload_string)) > 1) {
printf(" with value '%s'\n", z_string_data(z_loan(payload_string)));
printf(" with value '%s'\n", z_string_data(z_loan(payload_string)));
}
// Check attachment
kv_pairs_rx_t kvp = {
Expand All @@ -125,22 +131,28 @@ void query_handler(const z_loaned_query_t *query, void *ctx) {
}
drop_attachment(&kvp);
z_drop(z_move(payload_string));
z_drop(z_move(encoding));

// Reply value encoding
// Reply payload
z_owned_bytes_t reply_payload;
z_bytes_serialize_from_str(&reply_payload, value);

z_query_reply_options_t options;
z_query_reply_options_default(&options);

// Reply attachment
kv_pair_t kvs[1];
kvs[0] = (kv_pair_t){.key = "reply_key", .value = "reply_value"};
kv_pairs_tx_t kv_ctx = (kv_pairs_tx_t){.data = kvs, .current_idx = 0, .len = 1};
z_owned_bytes_t attachment;
z_bytes_serialize_from_iter(&attachment, create_attachment_iter, (void *)&kv_ctx);

z_query_reply_options_t options;
z_query_reply_options_default(&options);
options.attachment = z_move(attachment);

// Reply encoding
z_owned_encoding_t reply_encoding;
z_encoding_from_str(&reply_encoding, "zenoh/string;utf8");
options.encoding = z_move(reply_encoding);

z_query_reply(query, z_query_keyexpr(query), z_move(reply_payload), &options);
z_drop(z_move(keystr));
msg_nb++;
Expand Down
2 changes: 1 addition & 1 deletion examples/unix/c11/z_sub_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void parse_attachment(kv_pairs_t *kvp, const z_loaned_bytes_t *attachment) {
}

void print_attachment(kv_pairs_t *kvp) {
printf(" with attachment:\n");
printf(" with attachment:\n");
for (uint32_t i = 0; i < kvp->current_idx; i++) {
printf(" %d: %s, %s\n", i, z_string_data(z_loan(kvp->data[i].key)),
z_string_data(z_loan(kvp->data[i].value)));
Expand Down
11 changes: 4 additions & 7 deletions include/zenoh-pico/api/handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#define _Z_CHANNEL_DEFINE_IMPL(handler_type, handler_name, handler_new_f_name, callback_type, callback_new_f, \
collection_type, collection_new_f, collection_free_f, collection_push_f, \
collection_pull_f, collection_try_pull_f, elem_owned_type, elem_loaned_type, \
elem_copy_f, elem_drop_f) \
elem_clone_f, elem_drop_f) \
typedef struct { \
collection_type *collection; \
} handler_type; \
Expand All @@ -50,11 +50,8 @@
_Z_ERROR("Out of memory"); \
return; \
} \
if (elem == NULL) { \
internal_elem->_rc.in = NULL; \
jean-roland marked this conversation as resolved.
Show resolved Hide resolved
} else { \
elem_copy_f(&internal_elem->_rc, elem); \
} \
elem_clone_f(internal_elem, elem); \
\
int8_t ret = collection_push_f(internal_elem, context, _z_##handler_name##_elem_free); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_push_f, ret); \
Expand Down Expand Up @@ -112,7 +109,7 @@
/* collection_try_pull_f */ _z_##kind_name##_mt_try_pull, \
/* elem_owned_type */ z_owned_##item_name##_t, \
/* elem_loaned_type */ z_loaned_##item_name##_t, \
/* elem_copy_f */ _z_##item_name##_rc_copy, \
/* elem_clone_f */ z_##item_name##_clone, \
/* elem_drop_f */ z_##item_name##_drop)

#define _Z_CHANNEL_DEFINE_DUMMY(item_name, kind_name) \
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,7 @@ z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample);
z_qos_t z_sample_qos(const z_loaned_sample_t *sample);

/**
* Gets the attachment of a value by aliasing it.
* Gets the attachment of a sample by aliasing it.
*
* Parameters:
* sample: Pointer to a :c:type:`z_loaned_sample_t` to get the attachment from.
Expand Down
8 changes: 4 additions & 4 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,8 @@ static inline z_qos_t z_qos_default(void) { return _Z_N_QOS_DEFAULT; }
* z_timestamp_t timestamp: The timestamp of this data sample.
* z_qos_t qos: Quality of service settings used to deliver this sample.
*/
_Z_OWNED_TYPE_RC(_z_sample_rc_t, sample)
_Z_LOANED_TYPE(_z_sample_rc_t, sample)
_Z_OWNED_TYPE_PTR(_z_sample_t, sample)
_Z_LOANED_TYPE(_z_sample_t, sample)

/**
* Represents the content of a `hello` message returned by a zenoh entity as a reply to a `scout` message.
Expand All @@ -420,8 +420,8 @@ _Z_LOANED_TYPE(_z_hello_t, hello)
/**
* Represents the reply to a query.
*/
_Z_OWNED_TYPE_RC(_z_reply_rc_t, reply)
_Z_LOANED_TYPE(_z_reply_rc_t, reply)
_Z_OWNED_TYPE_PTR(_z_reply_t, reply)
_Z_LOANED_TYPE(_z_reply_t, reply)

/**
* Represents an array of non null-terminated string.
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/collections/string.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ _z_string_t *_z_string_make_as_ptr(const char *value);
size_t _z_string_size(const _z_string_t *s);
int8_t _z_string_copy(_z_string_t *dst, const _z_string_t *src);
void _z_string_move(_z_string_t *dst, _z_string_t *src);
_z_string_t _z_string_steal(_z_string_t *str);
void _z_string_move_str(_z_string_t *dst, char *src);
void _z_string_clear(_z_string_t *s);
void _z_string_free(_z_string_t **s);
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/net/encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ void _z_encoding_clear(_z_encoding_t *encoding);
_Bool _z_encoding_check(const _z_encoding_t *encoding);
int8_t _z_encoding_copy(_z_encoding_t *dst, const _z_encoding_t *src);
void _z_encoding_move(_z_encoding_t *dst, _z_encoding_t *src);
_z_encoding_t _z_encoding_steal(_z_encoding_t *val);

#endif /* ZENOH_PICO_ENCODING_NETAPI_H */
9 changes: 6 additions & 3 deletions include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ typedef struct _z_query_t {
_z_value_t _value;
_z_keyexpr_t _key;
uint32_t _request_id;
_z_session_t *_zn;
_z_session_t *_zn; // FIXME: Potential UB source, Issue #476
_z_bytes_t attachment;
char *_parameters;
_Bool _anyke;
} _z_query_t;

_z_query_t _z_query_null(void);
void _z_query_clear(_z_query_t *q);
void _z_query_copy(_z_query_t *dst, const _z_query_t *src);
void _z_query_free(_z_query_t **query);

_Z_REFCOUNT_DEFINE(_z_query, _z_query)

/**
Expand All @@ -47,8 +50,8 @@ typedef struct {
} _z_queryable_t;

#if Z_FEATURE_QUERYABLE == 1
_z_query_t _z_query_create(const _z_value_t *value, const _z_keyexpr_t *key, const _z_slice_t *parameters,
_z_session_t *zn, uint32_t request_id, const _z_bytes_t attachment);
_z_query_t _z_query_create(const _z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_t *zn,
uint32_t request_id, const _z_bytes_t attachment);
void _z_queryable_clear(_z_queryable_t *qbl);
void _z_queryable_free(_z_queryable_t **qbl);
#endif
Expand Down
8 changes: 3 additions & 5 deletions include/zenoh-pico/net/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
*
*/
typedef struct _z_reply_data_t {
_z_sample_rc_t sample;
_z_sample_t sample;
_z_id_t replier_id;
} _z_reply_data_t;

void _z_reply_data_clear(_z_reply_data_t *rd);
void _z_reply_data_copy(_z_reply_data_t *dst, _z_reply_data_t *src);
void _z_reply_data_copy(_z_reply_data_t *dst, const _z_reply_data_t *src);
_z_reply_t _z_reply_move(_z_reply_t *src_reply);

_Z_ELEM_DEFINE(_z_reply_data, _z_reply_data_t, _z_noop_size, _z_reply_data_clear, _z_noop_copy)
Expand All @@ -60,13 +60,11 @@ typedef struct _z_reply_t {
_z_reply_t _z_reply_null(void);
void _z_reply_clear(_z_reply_t *src);
void _z_reply_free(_z_reply_t **hello);
void _z_reply_copy(_z_reply_t *dst, _z_reply_t *src);
void _z_reply_copy(_z_reply_t *dst, const _z_reply_t *src);
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
const _z_timestamp_t *timestamp, _z_encoding_t encoding, z_sample_kind_t kind,
const _z_bytes_t attachment);

_Z_REFCOUNT_DEFINE(_z_reply, _z_reply)

typedef struct _z_pending_reply_t {
_z_reply_t _reply;
_z_timestamp_t _tstamp;
Expand Down
6 changes: 2 additions & 4 deletions include/zenoh-pico/net/sample.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ typedef struct _z_sample_t {
} _z_sample_t;
void _z_sample_clear(_z_sample_t *sample);

_Z_REFCOUNT_DEFINE(_z_sample, _z_sample)

_z_sample_t _z_sample_null(void);
_Bool _z_sample_check(const _z_sample_t *sample);
void _z_sample_move(_z_sample_t *dst, _z_sample_t *src);
Expand All @@ -56,8 +54,8 @@ void _z_sample_free(_z_sample_t **sample);
void _z_sample_copy(_z_sample_t *dst, const _z_sample_t *src);
_z_sample_t _z_sample_duplicate(const _z_sample_t *src);

_z_sample_t _z_sample_create(const _z_keyexpr_t *key, const _z_bytes_t payload, _z_timestamp_t timestamp,
const _z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos,
_z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, _z_timestamp_t timestamp,
_z_encoding_t encoding, const z_sample_kind_t kind, const _z_qos_t qos,
const _z_bytes_t attachment);

#endif /* ZENOH_PICO_SAMPLE_NETAPI_H */
14 changes: 7 additions & 7 deletions include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ _Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_noop_size, _z_resource_clear, _z_r
_Z_LIST_DEFINE(_z_resource, _z_resource_t)

// Forward declaration to avoid cyclical include
typedef struct _z_sample_rc_t z_loaned_sample_t;
typedef struct _z_sample_t _z_sample_t;

/**
* The callback signature of the functions handling data messages.
*/
typedef void (*_z_data_handler_t)(const z_loaned_sample_t *sample, void *arg);
typedef void (*_z_data_handler_t)(const _z_sample_t *sample, void *arg);

typedef struct {
_z_keyexpr_t _key;
Expand All @@ -81,12 +81,12 @@ typedef struct {
} _z_publication_t;

// Forward type declaration to avoid cyclical include
typedef struct _z_query_rc_t z_loaned_query_t;
typedef struct _z_query_rc_t _z_query_rc_t;

/**
* The callback signature of the functions handling query messages.
*/
typedef void (*_z_queryable_handler_t)(const z_loaned_query_t *query, void *arg);
typedef void (*_z_queryable_handler_t)(const _z_query_rc_t *query, void *arg);

typedef struct {
_z_keyexpr_t _key;
Expand All @@ -110,12 +110,12 @@ _Z_LIST_DEFINE(_z_session_queryable_rc, _z_session_queryable_rc_t)
typedef struct _z_reply_t _z_reply_t;
typedef _z_list_t _z_reply_data_list_t;
typedef _z_list_t _z_pending_reply_list_t;
typedef struct _z_reply_rc_t _z_reply_rc_t;
typedef _z_reply_rc_t z_loaned_reply_t;
typedef struct _z_reply_t _z_reply_t;

/**
* The callback signature of the functions handling query replies.
*/
typedef void (*_z_reply_handler_t)(const z_loaned_reply_t *reply, void *arg);
typedef void (*_z_reply_handler_t)(const _z_reply_t *reply, void *arg);

typedef struct {
_z_keyexpr_t _key;
Expand Down
25 changes: 12 additions & 13 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ static int8_t _z_encoding_convert_into_string(const z_loaned_encoding_t *encodin
}
// Allocate string
char *value = (char *)z_malloc(sizeof(char) * total_len);
memset(value, 0, total_len);
if (value == NULL) {
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
Expand Down Expand Up @@ -666,7 +667,7 @@ static _z_encoding_t _z_encoding_from_owned(const z_owned_encoding_t *encoding)
}
#endif

_Z_OWNED_FUNCTIONS_RC_IMPL(sample)
_Z_OWNED_FUNCTIONS_PTR_IMPL(_z_sample_t, sample, _z_sample_copy, _z_sample_free)
_Z_OWNED_FUNCTIONS_RC_IMPL(session)

_Z_OWNED_FUNCTIONS_CLOSURE_IMPL(closure_sample, _z_data_handler_t, z_dropper_handler_t)
Expand Down Expand Up @@ -809,15 +810,13 @@ int8_t z_info_routers_zid(const z_loaned_session_t *zs, z_owned_closure_zid_t *c

z_id_t z_info_zid(const z_loaned_session_t *zs) { return _Z_RC_IN_VAL(zs)._local_zid; }

const z_loaned_keyexpr_t *z_sample_keyexpr(const z_loaned_sample_t *sample) { return &_Z_RC_IN_VAL(sample).keyexpr; }
z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample) { return _Z_RC_IN_VAL(sample).kind; }
const z_loaned_bytes_t *z_sample_payload(const z_loaned_sample_t *sample) { return &_Z_RC_IN_VAL(sample).payload; }
z_timestamp_t z_sample_timestamp(const z_loaned_sample_t *sample) { return _Z_RC_IN_VAL(sample).timestamp; }
const z_loaned_encoding_t *z_sample_encoding(const z_loaned_sample_t *sample) { return &_Z_RC_IN_VAL(sample).encoding; }
z_qos_t z_sample_qos(const z_loaned_sample_t *sample) { return _Z_RC_IN_VAL(sample).qos; }
const z_loaned_bytes_t *z_sample_attachment(const z_loaned_sample_t *sample) {
return &_Z_RC_IN_VAL(sample).attachment;
}
const z_loaned_keyexpr_t *z_sample_keyexpr(const z_loaned_sample_t *sample) { return &sample->keyexpr; }
z_sample_kind_t z_sample_kind(const z_loaned_sample_t *sample) { return sample->kind; }
const z_loaned_bytes_t *z_sample_payload(const z_loaned_sample_t *sample) { return &sample->payload; }
z_timestamp_t z_sample_timestamp(const z_loaned_sample_t *sample) { return sample->timestamp; }
const z_loaned_encoding_t *z_sample_encoding(const z_loaned_sample_t *sample) { return &sample->encoding; }
z_qos_t z_sample_qos(const z_loaned_sample_t *sample) { return sample->qos; }
const z_loaned_bytes_t *z_sample_attachment(const z_loaned_sample_t *sample) { return &sample->attachment; }

const z_loaned_bytes_t *z_reply_err_payload(const z_loaned_reply_err_t *reply_err) { return &reply_err->payload; }
const z_loaned_encoding_t *z_reply_err_encoding(const z_loaned_reply_err_t *reply_err) { return &reply_err->encoding; }
Expand Down Expand Up @@ -989,7 +988,7 @@ z_owned_keyexpr_t z_publisher_keyexpr(z_loaned_publisher_t *publisher) {
#endif

#if Z_FEATURE_QUERY == 1
_Z_OWNED_FUNCTIONS_RC_IMPL(reply)
_Z_OWNED_FUNCTIONS_PTR_IMPL(_z_reply_t, reply, _z_reply_copy, _z_reply_free)

void z_get_options_default(z_get_options_t *options) {
options->target = z_query_target_default();
Expand Down Expand Up @@ -1045,11 +1044,11 @@ int8_t z_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, co
_Bool z_reply_is_ok(const z_loaned_reply_t *reply) {
_ZP_UNUSED(reply);
// For the moment always return TRUE.
// The support for reply errors will come in the next release.
// FIXME: The support for reply errors will come in the next release.
return true;
}

const z_loaned_sample_t *z_reply_ok(const z_loaned_reply_t *reply) { return &reply->in->val.data.sample; }
const z_loaned_sample_t *z_reply_ok(const z_loaned_reply_t *reply) { return &reply->data.sample; }

const z_loaned_reply_err_t *z_reply_err(const z_loaned_reply_t *reply) {
_ZP_UNUSED(reply);
Expand Down
Loading
Loading