Skip to content

Commit

Permalink
Implement error reply support (#523)
Browse files Browse the repository at this point in the history
* Reply error support impelmentation

* Remove keyexpr from error response

* Make tag constants private
  • Loading branch information
sashacmc authored Jul 10, 2024
1 parent 2167760 commit 510d583
Show file tree
Hide file tree
Showing 21 changed files with 237 additions and 55 deletions.
5 changes: 4 additions & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ Enums
.. autocenum:: constants.h::z_sample_kind_t
.. autocenum:: constants.h::z_consolidation_mode_t
.. autocenum:: constants.h::z_reliability_t
.. autocenum:: constants.h::z_reply_tag_t
.. autocenum:: constants.h::z_congestion_control_t
.. autocenum:: constants.h::z_priority_t
.. autocenum:: constants.h::z_query_target_t
Expand Down Expand Up @@ -426,6 +425,10 @@ Primitives
.. autocfunction:: primitives.h::z_undeclare_queryable
.. autocfunction:: primitives.h::z_query_reply_options_default
.. autocfunction:: primitives.h::z_query_reply
.. autocfunction:: primitives.h::z_query_reply_del_options_default
.. autocfunction:: primitives.h::z_query_reply_del
.. autocfunction:: primitives.h::z_query_reply_err_options_default
.. autocfunction:: primitives.h::z_query_reply_err
.. autocfunction:: primitives.h::z_keyexpr_from_str
.. autocfunction:: primitives.h::z_keyexpr_from_substr
.. autocfunction:: primitives.h::z_keyexpr_from_str_autocanonize
Expand Down
6 changes: 5 additions & 1 deletion examples/unix/c11/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ void reply_handler(const z_loaned_reply_t *reply, void *ctx) {
z_string_data(z_loan(replystr)));
z_drop(z_move(replystr));
} else {
printf(">> Received an error\n");
const z_loaned_reply_err_t *err = z_reply_err(reply);
z_owned_string_t errstr;
z_bytes_deserialize_into_string(z_reply_err_payload(err), &errstr);
printf(">> Received an error: %s\n", z_string_data(z_loan(errstr)));
z_drop(z_move(errstr));
}
}

Expand Down
34 changes: 23 additions & 11 deletions examples/unix/c11/z_queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
#if Z_FEATURE_QUERYABLE == 1
const char *keyexpr = "demo/example/zenoh-pico-queryable";
const char *value = "Queryable from Pico!";
const char *error = "Demo error";
static int msg_nb = 0;
static z_sample_kind_t reply_kind = Z_SAMPLE_KIND_PUT;
static enum { REPLY_DATA, REPLY_DELETE, REPLY_ERR } reply_kind = REPLY_DATA;
bool reply_err = false;

void query_handler(const z_loaned_query_t *query, void *ctx) {
(void)(ctx);
Expand All @@ -40,20 +42,27 @@ void query_handler(const z_loaned_query_t *query, void *ctx) {
}
z_drop(z_move(payload_string));

// Reply value encoding
z_owned_bytes_t reply_payload;
z_bytes_serialize_from_str(&reply_payload, value);

switch (reply_kind) {
case Z_SAMPLE_KIND_PUT:
case REPLY_DATA: {
// Reply value encoding
z_owned_bytes_t reply_payload;
z_bytes_serialize_from_str(&reply_payload, value);

z_query_reply(query, z_query_keyexpr(query), z_move(reply_payload), NULL);
break;
case Z_SAMPLE_KIND_DELETE:
}
case REPLY_DELETE: {
z_query_reply_del(query, z_query_keyexpr(query), NULL);
break;
default:
printf("Unknown reply kind\n");
}
case REPLY_ERR: {
// Reply error encoding
z_owned_bytes_t reply_payload;
z_bytes_serialize_from_str(&reply_payload, error);

z_query_reply_err(query, z_move(reply_payload), NULL);
break;
}
}
msg_nb++;
}
Expand All @@ -65,7 +74,7 @@ int main(int argc, char **argv) {
int n = 0;

int opt;
while ((opt = getopt(argc, argv, "k:e:m:v:l:n:d")) != -1) {
while ((opt = getopt(argc, argv, "k:e:m:v:l:n:df")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
Expand All @@ -86,7 +95,10 @@ int main(int argc, char **argv) {
n = atoi(optarg);
break;
case 'd':
reply_kind = Z_SAMPLE_KIND_DELETE;
reply_kind = REPLY_DELETE;
break;
case 'f':
reply_kind = REPLY_ERR;
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l' ||
Expand Down
10 changes: 0 additions & 10 deletions include/zenoh-pico/api/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,6 @@ typedef enum {
typedef enum { Z_RELIABILITY_BEST_EFFORT = 1, Z_RELIABILITY_RELIABLE = 0 } z_reliability_t;
#define Z_RELIABILITY_DEFAULT Z_RELIABILITY_RELIABLE

/**
* Reply tag values.
*
* Enumerators:
* Z_REPLY_TAG_DATA: Tag identifying that the reply contains some data.
* Z_REPLY_TAG_FINAL: Tag identifying that the reply does not contain any data and that there will be no more
* replies for this query.
*/
typedef enum { Z_REPLY_TAG_DATA = 0, Z_REPLY_TAG_FINAL = 1 } z_reply_tag_t;

/**
* Congestion control values.
*
Expand Down
28 changes: 28 additions & 0 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -1780,6 +1780,34 @@ void z_query_reply_del_options_default(z_query_reply_del_options_t *options);
*/
int8_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr,
const z_query_reply_del_options_t *options);

/**
* Builds a :c:type:`z_query_reply_err_options_t` with default values.
*
* Parameters:
* options: Pointer to an uninitialized :c:type:`z_query_reply_err_options_t`.
*/
void z_query_reply_err_options_default(z_query_reply_err_options_t *options);

/**
* Sends a reply error to a query.
*
* This function must be called inside of a :c:type:`z_owned_closure_query_t` callback associated to the
* :c:type:`z_owned_queryable_t`, passing the received query as parameters of the callback function. This function can
* be called multiple times to send multiple replies to a query. The reply will be considered complete when the callback
* returns.
*
* Parameters:
* query: Pointer to a :c:type:`z_loaned_query_t` to reply.
* payload: Pointer to the reply error data.
* options: Pointer to a :c:type:`z_query_reply_err_options_t` to configure the reply error.
*
* Return:
* ``0`` if reply operation successful, ``negative value`` otherwise.
*/
int8_t z_query_reply_err(const z_loaned_query_t *query, z_owned_bytes_t *payload,
const z_query_reply_err_options_t *options);

#endif

/**
Expand Down
10 changes: 10 additions & 0 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,16 @@ typedef struct {
z_owned_bytes_t *attachment;
} z_query_reply_del_options_t;

/**
* Represents the configuration used to configure a query reply error sent via :c:func:`z_query_reply_err.
*
* Members:
* z_owned_encoding_t *encoding: The encoding of the payload.
*/
typedef struct {
z_owned_encoding_t *encoding;
} z_query_reply_err_options_t;

/**
* Represents the configuration used to configure a put operation sent via via :c:func:`z_put`.
*
Expand Down
14 changes: 14 additions & 0 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,20 @@ int8_t _z_send_reply(const _z_query_t *query, const _z_session_rc_t *zsrc, const
const _z_value_t payload, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl,
z_priority_t priority, _Bool is_express, const _z_timestamp_t *timestamp,
const _z_bytes_t attachment);
/**
* Send a reply error to a query.
*
* This function must be called inside of a Queryable callback passing the
* query received as parameters of the callback function. This function can
* be called multiple times to send multiple replies to a query. The reply
* will be considered complete when the Queryable callback returns.
*
* Parameters:
* query: The query to reply to. The caller keeps its ownership.
* key: The resource key of this reply. The caller keeps the ownership.
* payload: The value of this reply, the caller keeps ownership.
*/
int8_t _z_send_reply_err(const _z_query_t *query, const _z_session_rc_t *zsrc, const _z_value_t payload);
#endif

#if Z_FEATURE_QUERY == 1
Expand Down
19 changes: 16 additions & 3 deletions include/zenoh-pico/net/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
*
*/
typedef struct _z_reply_data_t {
_z_value_t error;
_z_sample_t sample;
_z_id_t replier_id;
} _z_reply_data_t;
Expand All @@ -42,18 +43,29 @@ int8_t _z_reply_data_copy(_z_reply_data_t *dst, const _z_reply_data_t *src);
_Z_ELEM_DEFINE(_z_reply_data, _z_reply_data_t, _z_noop_size, _z_reply_data_clear, _z_noop_copy)
_Z_LIST_DEFINE(_z_reply_data, _z_reply_data_t)

/**
* Reply tag values.
*
* Enumerators:
* _Z_REPLY_TAG_DATA: Tag identifying that the reply contains some data.
* _Z_REPLY_TAG_FINAL: Tag identifying that the reply does not contain any data and that there will be no more
* replies for this query.
* _Z_REPLY_TAG_ERROR: Tag identifying that the reply contains error
*/
typedef enum { _Z_REPLY_TAG_DATA = 0, _Z_REPLY_TAG_FINAL = 1, _Z_REPLY_TAG_ERROR = 2 } _z_reply_tag_t;

/**
* An reply to a :c:func:`z_query`.
*
* Members:
* _z_reply_t_Tag tag: Indicates if the reply contains data or if it's a FINAL reply.
* _z_reply_data_t data: The reply data if :c:member:`_z_reply_t.tag` equals
* :c:member:`_z_reply_t_Tag.Z_REPLY_TAG_DATA`.
* :c:member:`_z_reply_t_Tag._Z_REPLY_TAG_DATA`.
*
*/
typedef struct _z_reply_t {
_z_reply_data_t data;
z_reply_tag_t _tag;
_z_reply_tag_t _tag;
} _z_reply_t;

_z_reply_t _z_reply_move(_z_reply_t *src_reply);
Expand All @@ -63,9 +75,10 @@ _Bool _z_reply_check(const _z_reply_t *reply);
void _z_reply_clear(_z_reply_t *src);
void _z_reply_free(_z_reply_t **hello);
int8_t _z_reply_copy(_z_reply_t *dst, const _z_reply_t *src);
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_reply_tag_t tag, _z_id_t id, const _z_bytes_t payload,
const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind,
const _z_bytes_t attachment);
_z_reply_t _z_reply_err_create(const _z_bytes_t payload, _z_encoding_t *encoding);

typedef struct _z_pending_reply_t {
_z_reply_t _reply;
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/protocol/definitions/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
/// +---------------+
#define _Z_FLAG_Z_E_E 0x40
typedef struct {
_z_encoding_t encoding;
_z_encoding_t _encoding;
_z_source_info_t _ext_source_info;
_z_bytes_t _payload;
} _z_msg_err_t;
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/session/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ _z_pending_query_t *_z_get_pending_query_by_id(_z_session_t *zn, const _z_zint_t
int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pq);
int8_t _z_trigger_query_reply_partial(_z_session_t *zn, _z_zint_t reply_context, const _z_keyexpr_t keyexpr,
_z_msg_put_t *msg, z_sample_kind_t kind);
int8_t _z_trigger_query_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err_t *msg);
int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id);
void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pq);
void _z_flush_pending_queries(_z_session_t *zn);
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/session/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

int8_t _z_trigger_reply_partial(_z_session_t *zn, _z_zint_t id, _z_keyexpr_t key, _z_msg_reply_t *reply);

int8_t _z_trigger_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err_t *error);

int8_t _z_trigger_reply_final(_z_session_t *zn, _z_n_msg_response_final_t *final);

#endif /* ZENOH_PICO_SESSION_REPLY_H */
40 changes: 30 additions & 10 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1224,19 +1224,11 @@ int8_t z_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, co
return ret;
}

_Bool z_reply_is_ok(const z_loaned_reply_t *reply) {
_ZP_UNUSED(reply);
// For the moment always return TRUE.
// FIXME: The support for reply errors will come in the next release.
return true;
}
_Bool z_reply_is_ok(const z_loaned_reply_t *reply) { return reply->_tag != _Z_REPLY_TAG_ERROR; }

const z_loaned_sample_t *z_reply_ok(const z_loaned_reply_t *reply) { return &reply->data.sample; }

const z_loaned_reply_err_t *z_reply_err(const z_loaned_reply_t *reply) {
_ZP_UNUSED(reply);
return NULL;
}
const z_loaned_reply_err_t *z_reply_err(const z_loaned_reply_t *reply) { return &reply->data.error; }
#endif

#if Z_FEATURE_QUERYABLE == 1
Expand Down Expand Up @@ -1367,6 +1359,34 @@ int8_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t
z_bytes_drop(opts.attachment);
return ret;
}

void z_query_reply_err_options_default(z_query_reply_err_options_t *options) { options->encoding = NULL; }

int8_t z_query_reply_err(const z_loaned_query_t *query, z_owned_bytes_t *payload,
const z_query_reply_err_options_t *options) {
// Try upgrading session weak to rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade(&query->in->val._zn);
if (sess_rc.in == NULL) {
return _Z_ERR_CONNECTION_CLOSED;
}
z_query_reply_err_options_t opts;
if (options == NULL) {
z_query_reply_err_options_default(&opts);
} else {
opts = *options;
}
// Set value
_z_value_t value = {.payload = _z_bytes_from_owned_bytes(payload),
.encoding = _z_encoding_from_owned(opts.encoding)};

int8_t ret = _z_send_reply_err(&query->in->val, &sess_rc, value);
if (payload != NULL) {
z_bytes_drop(payload);
}
// Clean-up
z_encoding_drop(opts.encoding);
return ret;
}
#endif

int8_t z_keyexpr_from_str_autocanonize(z_owned_keyexpr_t *key, const char *name) {
Expand Down
30 changes: 30 additions & 0 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,36 @@ int8_t _z_send_reply(const _z_query_t *query, const _z_session_rc_t *zsrc, _z_ke

return ret;
}

int8_t _z_send_reply_err(const _z_query_t *query, const _z_session_rc_t *zsrc, const _z_value_t payload) {
int8_t ret = _Z_RES_OK;
_z_session_t *zn = &zsrc->in->val;

// Build the reply context decorator. This is NOT the final reply.
_z_id_t zid = zn->_local_zid;
_z_zenoh_message_t msg = {
._tag = _Z_N_RESPONSE,
._body._response =
{
._request_id = query->_request_id,
._ext_responder = {._zid = zid, ._eid = 0},
._ext_qos = _z_n_qos_make(false, true, Z_PRIORITY_DEFAULT),
._ext_timestamp = _z_timestamp_null(),
._tag = _Z_RESPONSE_BODY_ERR,
._body._err =
{
._payload = payload.payload,
._encoding = payload.encoding,
._ext_source_info = _z_source_info_null(),
},
},
};
if (_z_send_n_msg(zn, &msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
ret = _Z_ERR_TRANSPORT_TX_FAILED;
}

return ret;
}
#endif

#if Z_FEATURE_QUERY == 1
Expand Down
Loading

0 comments on commit 510d583

Please sign in to comment.