diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index 1ebfbdcf2..1d95bac7c 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -211,7 +211,8 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle); * key: The resource key of this reply. The caller keeps the ownership. * payload: The value of this reply, the caller keeps ownership. */ -int8_t _z_send_reply(const _z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload); +int8_t _z_send_reply(const _z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload, + const z_sample_kind_t kind); #endif #if Z_FEATURE_QUERY == 1 diff --git a/include/zenoh-pico/protocol/definitions/message.h b/include/zenoh-pico/protocol/definitions/message.h index 503483c89..83d8f61ba 100644 --- a/include/zenoh-pico/protocol/definitions/message.h +++ b/include/zenoh-pico/protocol/definitions/message.h @@ -48,58 +48,25 @@ #define _Z_FRAG_BUFF_BASE_SIZE 128 // Arbitrary base size of the buffer to encode a fragment message header // Flags: -// - T: Timestamp If T==1 then the timestamp if present +// - X: Reserved // - E: Encoding If E==1 then the encoding is present // - Z: Extension If Z==1 then at least one extension is present // // 7 6 5 4 3 2 1 0 // +-+-+-+-+-+-+-+-+ -// |Z|E|T| REPLY | +// |Z|E|X| ERR | // +-+-+-+---------+ -// ~ ts: ~ if T==1 -// +---------------+ -// ~ encoding ~ if E==1 -// +---------------+ -// ~ [repl_exts] ~ if Z==1 -// +---------------+ -// ~ pl: ~ -- Payload -// +---------------+ -typedef struct { - _z_timestamp_t _timestamp; - _z_value_t _value; - _z_source_info_t _ext_source_info; - z_consolidation_mode_t _ext_consolidation; -#if Z_FEATURE_ATTACHMENT == 1 - _z_owned_encoded_attachment_t _ext_attachment; -#endif -} _z_msg_reply_t; -void _z_msg_reply_clear(_z_msg_reply_t *msg); -#define _Z_FLAG_Z_R_T 0x20 -#define _Z_FLAG_Z_R_E 0x40 - -// Flags: -// - T: Timestamp If T==1 then the timestamp if present -// - I: Infrastructure If I==1 then the error is related to the infrastructure else to the user -// - Z: Extension If Z==1 then at least one extension is present -// -// 7 6 5 4 3 2 1 0 -// +-+-+-+-+-+-+-+-+ -// |Z|I|T| ERR | -// +-+-+-+---------+ -// % code:z16 % -// +---------------+ -// ~ ts: ~ if T==1 +// % encoding % // +---------------+ // ~ [err_exts] ~ if Z==1 // +---------------+ -#define _Z_FLAG_Z_E_T 0x20 -#define _Z_FLAG_Z_E_I 0x40 +/// ~ pl: ~ Payload +/// +---------------+ +#define _Z_FLAG_Z_E_E 0x40 typedef struct { - uint16_t _code; - _Bool _is_infrastructure; - _z_timestamp_t _timestamp; + _z_encoding_t encoding; _z_source_info_t _ext_source_info; - _z_value_t _ext_value; + _z_bytes_t _payload; } _z_msg_err_t; void _z_msg_err_clear(_z_msg_err_t *err); @@ -166,20 +133,21 @@ void _z_msg_put_clear(_z_msg_put_t *); /*------------------ Query Message ------------------*/ // 7 6 5 4 3 2 1 0 // +-+-+-+-+-+-+-+-+ -// |Z|C|P| QUERY | +// |Z|P|C| QUERY | // +-+-+-+---------+ -// ~ params ~ if P==1 -- -// +---------------+ // ~ consolidation ~ if C==1 -- u8 // +---------------+ +// ~ params ~ if P==1 -- +// +---------------+ // ~ [qry_exts] ~ if Z==1 // +---------------+ -#define _Z_FLAG_Z_Q_P 0x20 // 1 << 6 | Period if P==1 then a period is present +#define _Z_FLAG_Z_Q_C 0x20 // 1 << 5 | Consolidation if C==1 then consolidation is present +#define _Z_FLAG_Z_Q_P 0x40 // 1 << 6 | Params if P==1 then parameters are present typedef struct { _z_bytes_t _parameters; _z_source_info_t _ext_info; _z_value_t _ext_value; - z_consolidation_mode_t _ext_consolidation; + z_consolidation_mode_t _consolidation; #if Z_FEATURE_ATTACHMENT == 1 _z_owned_encoded_attachment_t _ext_attachment; #endif @@ -187,10 +155,38 @@ typedef struct { typedef struct { _Bool info; _Bool body; - _Bool consolidation; _Bool attachment; } _z_msg_query_reqexts_t; _z_msg_query_reqexts_t _z_msg_query_required_extensions(const _z_msg_query_t *msg); void _z_msg_query_clear(_z_msg_query_t *msg); +typedef struct { + _Bool _is_put; + union { + _z_msg_del_t _del; + _z_msg_put_t _put; + } _body; +} _z_reply_body_t; +// Flags: +// - C: Consolidation If C==1 then consolidation is present +// - X: Reserved +// - Z: Extension If Z==1 then at least one extension is present +// +// 7 6 5 4 3 2 1 0 +// +-+-+-+-+-+-+-+-+ +// |Z|X|C| REPLY | +// +-+-+-+---------+ +// ~ consolidation ~ if C==1 +// +---------------+ +// ~ [repl_exts] ~ if Z==1 +// +---------------+ +// ~ ReplyBody ~ -- Payload +// +---------------+ +typedef struct { + z_consolidation_mode_t _consolidation; + _z_reply_body_t _body; +} _z_msg_reply_t; +void _z_msg_reply_clear(_z_msg_reply_t *msg); +#define _Z_FLAG_Z_R_C 0x20 + #endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_MESSAGE_H */ diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index 5c09247d3..689102dbe 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -118,13 +118,7 @@ typedef struct { _z_n_msg_request_exts_t _z_n_msg_request_needed_exts(const _z_n_msg_request_t *msg); void _z_n_msg_request_clear(_z_n_msg_request_t *msg); -typedef struct { - _Bool _is_put; - union { - _z_msg_del_t _del; - _z_msg_put_t _put; - } _body; -} _z_push_body_t; +typedef _z_reply_body_t _z_push_body_t; _z_push_body_t _z_push_body_null(void); _z_push_body_t _z_push_body_steal(_z_push_body_t *msg); void _z_push_body_clear(_z_push_body_t *msg); @@ -236,7 +230,7 @@ _z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_byt z_attachment_t attachment #endif ); -_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_value_t) value); +_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body); _z_network_message_t _z_n_msg_make_ack(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key); _z_network_message_t _z_n_msg_make_response_final(_z_zint_t rid); _z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration); diff --git a/include/zenoh-pico/session/query.h b/include/zenoh-pico/session/query.h index 86d03bb6f..6dbcb6fc7 100644 --- a/include/zenoh-pico/session/query.h +++ b/include/zenoh-pico/session/query.h @@ -26,8 +26,7 @@ _z_pending_query_t *_z_get_pending_query_by_id(_z_session_t *zn, const _z_zint_t int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pq); int8_t _z_trigger_query_reply_partial(_z_session_t *zn, _z_zint_t reply_context, const _z_keyexpr_t keyexpr, - const _z_bytes_t payload, const _z_encoding_t encoding, const _z_zint_t kind, - const _z_timestamp_t timestamp); + const _z_msg_put_t *msg); int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id); void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pq); void _z_flush_pending_queries(_z_session_t *zn); diff --git a/src/api/api.c b/src/api/api.c index 43afb1b39..9a64be720 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -934,7 +934,7 @@ int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const ui .len = payload_len, }, .encoding = {.prefix = opts.encoding.prefix, .suffix = opts.encoding.suffix}}; - return _z_send_reply(&query->_val._rc.in->val, keyexpr, value); + return _z_send_reply(&query->_val._rc.in->val, keyexpr, value, Z_SAMPLE_KIND_PUT); return _Z_ERR_GENERIC; } #endif diff --git a/src/net/primitives.c b/src/net/primitives.c index 4958b04be..2aa5b7920 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -340,7 +340,8 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) { return _Z_RES_OK; } -int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload) { +int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload, + const z_sample_kind_t kind) { int8_t ret = _Z_RES_OK; _z_keyexpr_t q_ke; @@ -359,23 +360,30 @@ int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_val // Build the reply context decorator. This is NOT the final reply. _z_id_t zid = ((_z_session_t *)query->_zn)->_local_zid; _z_keyexpr_t ke = _z_keyexpr_alias(keyexpr); - _z_zenoh_message_t z_msg = { - ._tag = _Z_N_RESPONSE, - ._body._response = - { - ._request_id = query->_request_id, - ._key = ke, - ._ext_responder = {._zid = zid, ._eid = 0}, - ._ext_qos = _Z_N_QOS_DEFAULT, - ._ext_timestamp = _z_timestamp_null(), - ._tag = _Z_RESPONSE_BODY_REPLY, - ._body._reply = {._value = payload, - ._timestamp = _z_timestamp_null(), - ._ext_consolidation = Z_CONSOLIDATION_MODE_AUTO, - ._ext_source_info = _z_source_info_null()}, - }, - }; - + _z_zenoh_message_t z_msg; + switch (kind) { + default: + return _Z_ERR_GENERIC; + break; + case Z_SAMPLE_KIND_PUT: + z_msg._tag = _Z_N_RESPONSE; + z_msg._body._response._request_id = query->_request_id; + z_msg._body._response._key = ke; + z_msg._body._response._ext_responder._zid = zid; + z_msg._body._response._ext_responder._eid = 0; + z_msg._body._response._ext_qos = _Z_N_QOS_DEFAULT; + z_msg._body._response._ext_timestamp = _z_timestamp_null(); + z_msg._body._response._tag = _Z_RESPONSE_BODY_REPLY; + z_msg._body._response._body._reply._consolidation = Z_CONSOLIDATION_MODE_DEFAULT; + z_msg._body._response._body._reply._body._is_put = true; + z_msg._body._response._body._reply._body._body._put._payload = payload.payload; + z_msg._body._response._body._reply._body._body._put._encoding = payload.encoding; + z_msg._body._response._body._reply._body._body._put._attachment.body.decoded = z_attachment_null(); + z_msg._body._response._body._reply._body._body._put._attachment.is_encoded = false; + z_msg._body._response._body._reply._body._body._put._commons._timestamp = _z_timestamp_null(); + z_msg._body._response._body._reply._body._body._put._commons._source_info = _z_source_info_null(); + break; + } if (_z_send_n_msg(query->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { ret = _Z_ERR_TRANSPORT_TX_FAILED; } diff --git a/src/protocol/codec/message.c b/src/protocol/codec/message.c index 0f8438653..806d4d2ab 100644 --- a/src/protocol/codec/message.c +++ b/src/protocol/codec/message.c @@ -414,21 +414,26 @@ int8_t _z_query_encode(_z_wbuf_t *wbf, const _z_msg_query_t *msg) { _Bool has_params = z_bytes_check(&msg->_parameters); if (has_params) { - header |= _Z_FLAG_Z_P; + _Z_SET_FLAG(header, _Z_FLAG_Z_Q_P); + } + _Bool has_consolidation = (msg->_consolidation != Z_CONSOLIDATION_MODE_DEFAULT); + if (has_consolidation) { + _Z_SET_FLAG(header, _Z_FLAG_Z_Q_C); } _z_msg_query_reqexts_t required_exts = _z_msg_query_required_extensions(msg); - if (required_exts.body || required_exts.consolidation || required_exts.info || required_exts.attachment) { + if (required_exts.body || required_exts.info || required_exts.attachment) { header |= _Z_FLAG_Z_Z; } _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); - + if (has_consolidation) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, msg->_consolidation)); + } if (has_params) { _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &msg->_parameters)); } - if (required_exts.body) { uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x03; - if (required_exts.consolidation || required_exts.info || required_exts.attachment) { + if (required_exts.info || required_exts.attachment) { extheader |= _Z_FLAG_Z_Z; } _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); @@ -439,14 +444,6 @@ int8_t _z_query_encode(_z_wbuf_t *wbf, const _z_msg_query_t *msg) { _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &msg->_ext_value.encoding.suffix)); _Z_RETURN_IF_ERR(_z_bytes_val_encode(wbf, &msg->_ext_value.payload)); } - if (required_exts.consolidation) { - uint8_t extheader = _Z_MSG_EXT_ENC_ZINT | _Z_MSG_EXT_FLAG_M | 0x02; - if (required_exts.info || required_exts.attachment) { - extheader |= _Z_FLAG_Z_Z; - } - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); - _Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_ext_consolidation)); - } if (required_exts.info) { uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x01; if (required_exts.attachment) { @@ -475,10 +472,6 @@ int8_t _z_query_decode_extensions(_z_msg_ext_t *extension, void *ctx) { ret = _z_source_info_decode(&msg->_ext_info, &zbf); break; } - case _Z_MSG_EXT_ENC_ZINT | _Z_MSG_EXT_FLAG_M | 0x02: { // Consolidation - msg->_ext_consolidation = extension->_body._zint._val; - break; - } case _Z_MSG_EXT_ENC_ZBUF | 0x03: { // Payload _z_zbuf_t zbf = _z_zbytes_as_zbuf(extension->_body._zbuf._val); ret = _z_encoding_prefix_decode(&msg->_ext_value.encoding.prefix, &zbf); @@ -507,165 +500,91 @@ int8_t _z_query_decode_extensions(_z_msg_ext_t *extension, void *ctx) { int8_t _z_query_decode(_z_msg_query_t *msg, _z_zbuf_t *zbf, uint8_t header) { _Z_DEBUG("Decoding _Z_MID_Z_QUERY"); *msg = (_z_msg_query_t){0}; - msg->_ext_consolidation = Z_CONSOLIDATION_MODE_AUTO; int8_t ret = _Z_RES_OK; - if (_Z_HAS_FLAG(header, _Z_FLAG_Z_P)) { + + if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Q_C)) { + _Z_RETURN_IF_ERR(_z_uint8_decode((uint8_t *)&msg->_consolidation, zbf)); + } else { + msg->_consolidation = Z_CONSOLIDATION_MODE_DEFAULT; + } + if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Q_P)) { _Z_RETURN_IF_ERR(_z_bytes_decode(&msg->_parameters, zbf)); } else { _z_bytes_clear(&msg->_parameters); } - if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { _Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_query_decode_extensions, msg)); } - return ret; } int8_t _z_reply_encode(_z_wbuf_t *wbf, const _z_msg_reply_t *reply) { uint8_t header = _Z_MID_Z_REPLY; - if (_z_timestamp_check(&reply->_timestamp)) { - header |= _Z_FLAG_Z_R_T; - } - if (reply->_value.encoding.prefix != Z_ENCODING_PREFIX_EMPTY || - !_z_bytes_is_empty(&reply->_value.encoding.suffix)) { - header |= _Z_FLAG_Z_R_E; - } -#if Z_FEATURE_ATTACHMENT == 1 - z_attachment_t att = _z_encoded_as_attachment(&reply->_ext_attachment); - _Bool has_attachment = z_attachment_check(&att); -#else - _Bool has_attachment = false; -#endif - _Bool has_sourceinfo = _z_id_check(reply->_ext_source_info._id) || reply->_ext_source_info._source_sn != 0 || - reply->_ext_source_info._entity_id != 0; - _Bool has_consolidation_ext = reply->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO; - if (has_consolidation_ext || has_sourceinfo || has_attachment) { - header |= _Z_FLAG_Z_Z; + _Bool has_consolidation = reply->_consolidation != Z_CONSOLIDATION_MODE_DEFAULT; + if (has_consolidation) { + _Z_SET_FLAG(header, _Z_FLAG_Z_R_C); } _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); - int8_t ret = _Z_RES_OK; - if (_z_timestamp_check(&reply->_timestamp)) { - assert(_Z_HAS_FLAG(header, _Z_FLAG_Z_R_T)); - _Z_RETURN_IF_ERR(_z_timestamp_encode(wbf, &reply->_timestamp)); - } - if (((reply->_value.encoding.prefix != 0) || !_z_bytes_is_empty(&reply->_value.encoding.suffix))) { - assert(_Z_HAS_FLAG(header, _Z_FLAG_Z_R_E)); - _Z_RETURN_IF_ERR(_z_encoding_prefix_encode(wbf, reply->_value.encoding.prefix)); - _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &reply->_value.encoding.suffix)); - } - if (has_sourceinfo) { - uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x01; - if (has_consolidation_ext || has_attachment) { - extheader |= _Z_MSG_EXT_FLAG_Z; - } - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); - _Z_RETURN_IF_ERR(_z_source_info_encode_ext(wbf, &reply->_ext_source_info)); + if (has_consolidation) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, reply->_consolidation)); } - if (has_consolidation_ext) { - uint8_t extheader = _Z_MSG_EXT_ENC_ZINT | _Z_MSG_EXT_FLAG_M | 0x02; - if (has_attachment) { - extheader |= _Z_MSG_EXT_FLAG_Z; - } - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); - _Z_RETURN_IF_ERR(_z_zint_encode(wbf, reply->_ext_consolidation)); - } -#if Z_FEATURE_ATTACHMENT == 1 - if (has_attachment) { - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | 0x04)); - _Z_RETURN_IF_ERR(_z_attachment_encode_ext(wbf, att)); - } -#endif - _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &reply->_value.payload)); - return ret; + _Z_RETURN_IF_ERR(_z_push_body_encode(wbf, &reply->_body)); + return _Z_RES_OK; } int8_t _z_reply_decode_extension(_z_msg_ext_t *extension, void *ctx) { int8_t ret = _Z_RES_OK; _z_msg_reply_t *reply = (_z_msg_reply_t *)ctx; switch (_Z_EXT_FULL_ID(extension->_header)) { - case _Z_MSG_EXT_ENC_ZBUF | 0x01: { - _z_zbuf_t zbf = _z_zbytes_as_zbuf(extension->_body._zbuf._val); - ret = _z_source_info_decode(&reply->_ext_source_info, &zbf); - break; - } - case _Z_MSG_EXT_ENC_ZINT | _Z_MSG_EXT_FLAG_M | 0x02: { - reply->_ext_consolidation = extension->_body._zint._val; - break; - } -#if Z_FEATURE_ATTACHMENT == 1 - case _Z_MSG_EXT_ENC_ZBUF | 0x04: { - reply->_ext_attachment.is_encoded = true; - reply->_ext_attachment.body.encoded = extension->_body._zbuf._val._is_alloc - ? _z_bytes_steal(&extension->_body._zbuf._val) - : _z_bytes_duplicate(&extension->_body._zbuf._val); - break; - } -#endif default: - if (_Z_HAS_FLAG(extension->_header, _Z_MSG_EXT_FLAG_M)) { - ret = _z_msg_ext_unknown_error(extension, 0x0a); - } + ret = _z_msg_ext_unknown_error(extension, 0x0a); + break; } return ret; } int8_t _z_reply_decode(_z_msg_reply_t *reply, _z_zbuf_t *zbf, uint8_t header) { - int8_t ret = _Z_RES_OK; *reply = (_z_msg_reply_t){0}; - reply->_ext_consolidation = Z_CONSOLIDATION_MODE_AUTO; - if (_Z_HAS_FLAG(header, _Z_FLAG_Z_R_T)) { - _Z_RETURN_IF_ERR(_z_timestamp_decode(&reply->_timestamp, zbf)); - } - if (_Z_HAS_FLAG(header, _Z_FLAG_Z_R_E)) { - _Z_RETURN_IF_ERR(_z_encoding_prefix_decode(&reply->_value.encoding.prefix, zbf)); - _Z_RETURN_IF_ERR(_z_bytes_decode(&reply->_value.encoding.suffix, zbf)); + if (_Z_HAS_FLAG(header, _Z_FLAG_Z_R_C)) { + _Z_RETURN_IF_ERR(_z_uint8_decode((uint8_t *)&reply->_consolidation, zbf)); + } else { + reply->_consolidation = Z_CONSOLIDATION_MODE_DEFAULT; } if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { _Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_reply_decode_extension, reply)); } - _Z_RETURN_IF_ERR(_z_bytes_decode(&reply->_value.payload, zbf)); - - return ret; + uint8_t put_header = 0; + _Z_RETURN_IF_ERR(_z_uint8_decode(&put_header, zbf)); + _Z_RETURN_IF_ERR(_z_push_body_decode(&reply->_body, zbf, put_header)); + return _Z_RES_OK; } int8_t _z_err_encode(_z_wbuf_t *wbf, const _z_msg_err_t *err) { int8_t ret = _Z_RES_OK; uint8_t header = _Z_MID_Z_ERR; - _Bool has_timestamp = _z_timestamp_check(&err->_timestamp); - if (has_timestamp) { - header |= _Z_FLAG_Z_E_T; - } - if (err->_is_infrastructure) { - header |= _Z_FLAG_Z_E_I; + + // Encode header + _Bool has_encoding = err->encoding.prefix != Z_ENCODING_PREFIX_EMPTY; + if (has_encoding) { + _Z_SET_FLAG(header, _Z_FLAG_Z_E_E); } - _Bool has_payload_ext = err->_ext_value.payload.start != NULL || err->_ext_value.encoding.prefix != 0 || - !_z_bytes_is_empty(&err->_ext_value.encoding.suffix); _Bool has_sinfo_ext = _z_id_check(err->_ext_source_info._id) || err->_ext_source_info._entity_id != 0 || err->_ext_source_info._source_sn != 0; - if (has_sinfo_ext || has_payload_ext) { - header |= _Z_FLAG_Z_Z; + if (has_sinfo_ext) { + _Z_SET_FLAG(header, _Z_FLAG_Z_Z); } _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); - _Z_RETURN_IF_ERR(_z_zint_encode(wbf, err->_code)); - if (has_timestamp) { - _Z_RETURN_IF_ERR(_z_timestamp_encode(wbf, &err->_timestamp)); + // Encode encoding + if (has_encoding) { + _Z_RETURN_IF_ERR(_z_encoding_prefix_encode(wbf, err->encoding.prefix)); + _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &err->encoding.suffix)); } + // Encode extensions if (has_sinfo_ext) { uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x01; - if (has_payload_ext) { - extheader |= _Z_MSG_EXT_FLAG_Z; - } _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); _Z_RETURN_IF_ERR(_z_source_info_encode_ext(wbf, &err->_ext_source_info)); } - if (has_payload_ext) { - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | 0x02)); - _Z_RETURN_IF_ERR(_z_zint_encode(wbf, _z_zint_len(err->_ext_value.encoding.prefix) + - _z_bytes_encode_len(&err->_ext_value.encoding.suffix) + - _z_bytes_encode_len(&err->_ext_value.payload))); - _Z_RETURN_IF_ERR(_z_encoding_prefix_encode(wbf, err->_ext_value.encoding.prefix)); - _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &err->_ext_value.encoding.suffix)); - _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &err->_ext_value.payload)); - } + // Encode payload + _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &err->_payload)); return ret; } int8_t _z_err_decode_extension(_z_msg_ext_t *extension, void *ctx) { @@ -677,13 +596,6 @@ int8_t _z_err_decode_extension(_z_msg_ext_t *extension, void *ctx) { ret = _z_source_info_decode(&reply->_ext_source_info, &zbf); break; } - case _Z_MSG_EXT_ENC_ZBUF | 0x02: { - _z_zbuf_t zbf = _z_zbytes_as_zbuf(extension->_body._zbuf._val); - ret = _z_encoding_prefix_decode(&reply->_ext_value.encoding.prefix, &zbf); - ret |= _z_bytes_decode(&reply->_ext_value.encoding.suffix, &zbf); - ret |= _z_bytes_decode(&reply->_ext_value.payload, &zbf); - break; - } default: if (_Z_HAS_FLAG(extension->_header, _Z_MSG_EXT_FLAG_M)) { ret = _z_msg_ext_unknown_error(extension, 0x0a); @@ -693,23 +605,16 @@ int8_t _z_err_decode_extension(_z_msg_ext_t *extension, void *ctx) { } int8_t _z_err_decode(_z_msg_err_t *err, _z_zbuf_t *zbf, uint8_t header) { *err = (_z_msg_err_t){0}; - int8_t ret = _Z_RES_OK; - _z_zint_t code; - ret = _z_zint_decode(&code, zbf); - if (code <= UINT16_MAX) { - err->_code = (uint16_t)code; - } else { - ret = _Z_ERR_MESSAGE_DESERIALIZATION_FAILED; - } - err->_is_infrastructure = _Z_HAS_FLAG(header, _Z_FLAG_Z_E_I); - if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_Z_E_T)) { - ret = _z_timestamp_decode(&err->_timestamp, zbf); + + if (_Z_HAS_FLAG(header, _Z_FLAG_Z_E_E)) { + _Z_RETURN_IF_ERR(_z_encoding_prefix_decode(&err->encoding.prefix, zbf)); + _Z_RETURN_IF_ERR(_z_bytes_decode(&err->encoding.suffix, zbf)); } - if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { - ret = _z_msg_ext_decode_iter(zbf, _z_err_decode_extension, err); + if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { + _Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_err_decode_extension, err)); } - - return ret; + _Z_RETURN_IF_ERR(_z_bytes_decode(&err->_payload, zbf)); + return _Z_RES_OK; } int8_t _z_ack_encode(_z_wbuf_t *wbf, const _z_msg_ack_t *ack) { diff --git a/src/protocol/definitions/message.c b/src/protocol/definitions/message.c index 0fc391f62..0218f3420 100644 --- a/src/protocol/definitions/message.c +++ b/src/protocol/definitions/message.c @@ -18,8 +18,9 @@ #include "zenoh-pico/collections/bytes.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/definitions/network.h" -void _z_msg_reply_clear(_z_msg_reply_t *msg) { _z_value_clear(&msg->_value); } +void _z_msg_reply_clear(_z_msg_reply_t *msg) { _z_push_body_clear(&msg->_body); } void _z_msg_put_clear(_z_msg_put_t *msg) { _z_bytes_clear(&msg->_encoding.suffix); @@ -35,7 +36,6 @@ _z_msg_query_reqexts_t _z_msg_query_required_extensions(const _z_msg_query_t *ms .body = msg->_ext_value.payload.start != NULL || msg->_ext_value.encoding.prefix != 0 || !_z_bytes_is_empty(&msg->_ext_value.encoding.suffix), .info = _z_id_check(msg->_ext_info._id) || msg->_ext_info._entity_id != 0 || msg->_ext_info._source_sn != 0, - .consolidation = msg->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO, #if Z_FEATURE_ATTACHMENT == 1 .attachment = z_attachment_check(&att) #else @@ -49,6 +49,6 @@ void _z_msg_query_clear(_z_msg_query_t *msg) { _z_value_clear(&msg->_ext_value); } void _z_msg_err_clear(_z_msg_err_t *err) { - _z_timestamp_clear(&err->_timestamp); - _z_value_clear(&err->_ext_value); + _z_bytes_clear(&err->encoding.suffix); + _z_bytes_clear(&err->_payload); } diff --git a/src/protocol/definitions/network.c b/src/protocol/definitions/network.c index af5192b14..5f26ecf58 100644 --- a/src/protocol/definitions/network.c +++ b/src/protocol/definitions/network.c @@ -182,7 +182,7 @@ _z_zenoh_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_bytes ._key = _z_keyexpr_steal(key), ._tag = _Z_REQUEST_QUERY, ._body._query = {._parameters = _z_bytes_steal(parameters), - ._ext_consolidation = consolidation, + ._consolidation = consolidation, ._ext_value = _z_value_steal(value), ._ext_info = _z_source_info_null(), #if Z_FEATURE_ATTACHMENT == 1 @@ -220,7 +220,7 @@ _z_network_message_t _z_n_msg_make_push(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_pu ._body._push = {._key = _z_keyexpr_steal(key), ._body = _z_push_body_steal(body)}, }; } -_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_value_t) value) { +_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body) { return (_z_network_message_t){ ._tag = _Z_N_RESPONSE, ._body._response = @@ -230,10 +230,8 @@ _z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) ke ._request_id = rid, ._body._reply = { - ._timestamp = _z_timestamp_null(), - ._value = _z_value_steal(value), - ._ext_source_info = _z_source_info_null(), - ._ext_consolidation = Z_CONSOLIDATION_MODE_AUTO, + ._consolidation = Z_CONSOLIDATION_MODE_AUTO, + ._body = _z_push_body_steal(body), }, ._ext_qos = _Z_N_QOS_DEFAULT, ._ext_timestamp = _z_timestamp_null(), diff --git a/src/session/query.c b/src/session/query.c index 90d300621..a3080761d 100644 --- a/src/session/query.c +++ b/src/session/query.c @@ -133,8 +133,7 @@ int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry) } int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, const _z_keyexpr_t keyexpr, - const _z_bytes_t payload, const _z_encoding_t encoding, const _z_zint_t kind, - const _z_timestamp_t timestamp) { + const _z_msg_put_t *msg) { int8_t ret = _Z_RES_OK; _zp_session_lock_mutex(zn); @@ -156,11 +155,11 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons reply._tag = Z_REPLY_TAG_DATA; reply.data.replier_id = zn->_local_zid; reply.data.sample.keyexpr = expanded_ke; - _z_bytes_copy(&reply.data.sample.payload, &payload); - reply.data.sample.encoding.prefix = encoding.prefix; - _z_bytes_copy(&reply.data.sample.encoding.suffix, &encoding.suffix); - reply.data.sample.kind = kind; - reply.data.sample.timestamp = _z_timestamp_duplicate(×tamp); + _z_bytes_copy(&reply.data.sample.payload, &msg->_payload); + reply.data.sample.encoding.prefix = msg->_encoding.prefix; + _z_bytes_copy(&reply.data.sample.encoding.suffix, &msg->_encoding.suffix); + reply.data.sample.kind = Z_SAMPLE_KIND_PUT; + reply.data.sample.timestamp = _z_timestamp_duplicate(&msg->_commons._timestamp); // Verify if this is a newer reply, free the old one in case it is if ((ret == _Z_RES_OK) && ((pen_qry->_consolidation == Z_CONSOLIDATION_MODE_LATEST) || @@ -173,7 +172,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons // Check if this is the same resource key if (_z_str_eq(pen_rep->_reply.data.sample.keyexpr._suffix, reply.data.sample.keyexpr._suffix) == true) { - if (timestamp.time <= pen_rep->_tstamp.time) { + if (msg->_commons._timestamp.time <= pen_rep->_tstamp.time) { drop = true; } else { pen_qry->_pending_replies = @@ -199,7 +198,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons } else { pen_rep->_reply = reply; // Store the whole reply in the latest mode } - pen_rep->_tstamp = _z_timestamp_duplicate(×tamp); + pen_rep->_tstamp = _z_timestamp_duplicate(&msg->_commons._timestamp); pen_qry->_pending_replies = _z_pending_reply_list_push(pen_qry->_pending_replies, pen_rep); } else { ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; diff --git a/src/session/reply.c b/src/session/reply.c index 7fcffc0e3..7441b5e7f 100644 --- a/src/session/reply.c +++ b/src/session/reply.c @@ -25,8 +25,11 @@ int8_t _z_trigger_reply_partial(_z_session_t *zn, _z_zint_t id, _z_keyexpr_t key // TODO check id to know where to dispatch #if Z_FEATURE_QUERY == 1 - ret = _z_trigger_query_reply_partial(zn, id, key, reply->_value.payload, reply->_value.encoding, Z_SAMPLE_KIND_PUT, - reply->_timestamp); + if (reply->_body._is_put) { + ret = _z_trigger_query_reply_partial(zn, id, key, &reply->_body._body._put); + } else { + ret = _Z_ERR_GENERIC; + } #endif return ret; } diff --git a/src/session/rx.c b/src/session/rx.c index 087f21628..fae8d2a36 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -153,11 +153,11 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint ret = _z_trigger_reply_partial(zn, response._request_id, response._key, reply); } break; case _Z_RESPONSE_BODY_ERR: { - // @TODO: expose errors to the user + // @TODO: expose zenoh errors to the user _z_msg_err_t error = response._body._err; - _z_bytes_t payload = error._ext_value.payload; - _Z_ERROR("Received Err for query %zu: code=%d, message=%.*s", response._request_id, error._code, - (int)payload.len, payload.start); + _z_bytes_t payload = error._payload; + _Z_ERROR("Received Err for query %zu: message=%.*s", response._request_id, (int)payload.len, + payload.start); } break; case _Z_RESPONSE_BODY_ACK: { // @TODO: implement ACKs for puts/dels diff --git a/tests/z_msgcodec_test.c b/tests/z_msgcodec_test.c index 4e4511baa..c88d0ceef 100644 --- a/tests/z_msgcodec_test.c +++ b/tests/z_msgcodec_test.c @@ -518,8 +518,9 @@ void assert_eq_value(const _z_value_t *left, const _z_value_t *right) { _z_timestamp_t gen_timestamp(void) { _z_timestamp_t ts; ts.time = gen_uint64(); - _z_bytes_t id = gen_bytes(16); - memcpy(ts.id.id, id.start, id.len); + for (size_t i = 0; i < 16; i++) { + ts.id.id[i] = gen_uint8() & 0x7f; // 0b01111111 + } return ts; } @@ -1132,7 +1133,7 @@ void pull_message(void) { _z_msg_query_t gen_query(void) { return (_z_msg_query_t){ - ._ext_consolidation = (gen_uint8() % 4) - 1, + ._consolidation = (gen_uint8() % 4) - 1, ._ext_info = gen_source_info(), ._parameters = gen_bytes(16), ._ext_value = gen_bool() ? gen_value() : _z_value_null(), @@ -1140,7 +1141,7 @@ _z_msg_query_t gen_query(void) { } void assert_eq_query(const _z_msg_query_t *left, const _z_msg_query_t *right) { - assert(left->_ext_consolidation == right->_ext_consolidation); + assert(left->_consolidation == right->_consolidation); assert_eq_source_info(&left->_ext_info, &right->_ext_info); assert_eq_bytes(&left->_parameters, &right->_parameters); assert_eq_value(&left->_ext_value, &right->_ext_value); @@ -1164,21 +1165,18 @@ void query_message(void) { } _z_msg_err_t gen_err(void) { + size_t len = 1 + gen_uint8(); return (_z_msg_err_t){ - ._code = gen_uint16(), - ._is_infrastructure = gen_bool(), - ._timestamp = gen_timestamp(), + .encoding = gen_encoding(), ._ext_source_info = gen_bool() ? gen_source_info() : _z_source_info_null(), - ._ext_value = gen_bool() ? gen_value() : _z_value_null(), + ._payload = gen_payload(len), // Hangs if 0 }; } void assert_eq_err(const _z_msg_err_t *left, const _z_msg_err_t *right) { - assert(left->_code == right->_code); - assert(left->_is_infrastructure == right->_is_infrastructure); - assert_eq_timestamp(&left->_timestamp, &right->_timestamp); + assert_eq_encoding(&left->encoding, &right->encoding); assert_eq_source_info(&left->_ext_source_info, &right->_ext_source_info); - assert_eq_value(&left->_ext_value, &right->_ext_value); + assert_eq_bytes(&left->_payload, &right->_payload); } void err_message(void) { @@ -1225,18 +1223,14 @@ void ack_message(void) { _z_msg_reply_t gen_reply(void) { return (_z_msg_reply_t){ - ._ext_source_info = gen_bool() ? gen_source_info() : _z_source_info_null(), - ._timestamp = gen_timestamp(), - ._ext_consolidation = (gen_uint8() % 4) - 1, - ._value = gen_value(), + ._consolidation = (gen_uint8() % 4) - 1, + ._body = gen_push_body(), }; } void assert_eq_reply(const _z_msg_reply_t *left, const _z_msg_reply_t *right) { - assert_eq_timestamp(&left->_timestamp, &right->_timestamp); - assert_eq_source_info(&left->_ext_source_info, &right->_ext_source_info); - assert(left->_ext_consolidation == right->_ext_consolidation); - assert_eq_value(&left->_value, &right->_value); + assert(left->_consolidation == right->_consolidation); + assert_eq_push_body(&left->_body, &right->_body); } void reply_message(void) { @@ -1383,7 +1377,7 @@ _z_n_msg_response_t gen_response(void) { ret._body._err = gen_err(); } break; case 2: { - ret._tag = _Z_RESPONSE_BODY_ACK; + ret._tag = _Z_RESPONSE_BODY_REPLY; ret._body._reply = gen_reply(); } break; default: { @@ -1661,6 +1655,7 @@ _z_network_message_vec_t gen_net_msgs(size_t n) { _z_network_message_vec_t ret = _z_network_message_vec_make(n); for (size_t i = 0; i < n; i++) { _z_network_message_t *msg = (_z_network_message_t *)zp_malloc(sizeof(_z_network_message_t)); + memset(msg, 0, sizeof(_z_network_message_t)); *msg = gen_net_msg(); _z_network_message_vec_append(&ret, msg); }