From 1f4cf1d55f2aaa6e88654b7a1bf359c9e718c6ae Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 25 Nov 2024 07:42:19 +0100 Subject: [PATCH 01/32] feat: support start/stop fragment marker See https://github.com/eclipse-zenoh/zenoh/pull/1597 --- .../protocol/definitions/transport.h | 24 ++++++-- include/zenoh-pico/protocol/ext.h | 7 ++- include/zenoh-pico/transport/common/tx.h | 2 +- include/zenoh-pico/transport/transport.h | 13 ++++ include/zenoh-pico/transport/utils.h | 1 + src/protocol/codec/transport.c | 60 ++++++++++++++++--- src/protocol/definitions/transport.c | 49 +++++++++++++-- src/transport/common/tx.c | 4 +- src/transport/multicast/rx.c | 52 +++++++++++++++- src/transport/multicast/tx.c | 4 +- src/transport/peer_entry.c | 2 + src/transport/raweth/tx.c | 4 +- src/transport/unicast/rx.c | 52 ++++++++++++++-- src/transport/unicast/transport.c | 11 ++++ src/transport/unicast/tx.c | 4 +- src/transport/utils.c | 5 ++ 16 files changed, 262 insertions(+), 32 deletions(-) diff --git a/include/zenoh-pico/protocol/definitions/transport.h b/include/zenoh-pico/protocol/definitions/transport.h index 261c61fc6..d5ef7d295 100644 --- a/include/zenoh-pico/protocol/definitions/transport.h +++ b/include/zenoh-pico/protocol/definitions/transport.h @@ -91,6 +91,16 @@ extern "C" { // Z Extensions if Z==1 then Zenoh extensions are present #define _Z_FLAG_T_CLOSE_S 0x20 // 1 << 5 +/*=============================*/ +/* Patch */ +/*=============================*/ +/// Used to negotiate the patch version of the protocol +/// if not present (or 0), then protocol as released with 1.0.0 +/// if >= 1, then fragmentation start/stop marker +#define _Z_NO_PATCH 0x00 +#define _Z_CURRENT_PATCH 0x01 +#define _Z_PATCH_HAS_FRAGMENT_START_STOP(patch) (patch >= 1) + /*=============================*/ /* Transport Messages */ /*=============================*/ @@ -235,6 +245,9 @@ typedef struct { uint8_t _req_id_res; uint8_t _seq_num_res; uint8_t _version; +#if Z_FEATURE_FRAGMENTATION == 1 + uint8_t _patch; +#endif } _z_t_msg_join_t; void _z_t_msg_join_clear(_z_t_msg_join_t *msg); @@ -315,6 +328,9 @@ typedef struct { uint8_t _req_id_res; uint8_t _seq_num_res; uint8_t _version; +#if Z_FEATURE_FRAGMENTATION == 1 + uint8_t _patch; +#endif } _z_t_msg_init_t; void _z_t_msg_init_clear(_z_t_msg_init_t *msg); @@ -478,11 +494,11 @@ void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg); typedef struct { _z_slice_t _payload; _z_zint_t _sn; + bool start; + bool stop; } _z_t_msg_fragment_t; void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg); -#define _Z_FRAGMENT_HEADER_SIZE 12 - /*------------------ Transport Message ------------------*/ typedef union { _z_t_msg_join_t _join; @@ -514,9 +530,9 @@ _z_transport_message_t _z_t_msg_make_keep_alive(void); _z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_vec_t messages, z_reliability_t reliability); _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t reliability); -_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last); +_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, bool start, bool stop); _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t messages, z_reliability_t reliability, - bool is_last); + bool is_last, bool start, bool stop); /*------------------ Copy ------------------*/ void _z_t_msg_copy(_z_transport_message_t *clone, _z_transport_message_t *msg); diff --git a/include/zenoh-pico/protocol/ext.h b/include/zenoh-pico/protocol/ext.h index eb70ede42..a8f961ee4 100644 --- a/include/zenoh-pico/protocol/ext.h +++ b/include/zenoh-pico/protocol/ext.h @@ -43,7 +43,11 @@ extern "C" { /*=============================*/ /* Extension IDs */ /*=============================*/ -// #define _Z_MSG_EXT_ID_FOO 0x00 // Hex(ENC|M|ID) +#define _Z_MSG_EXT_ID_JOIN_QOS (0x01 | _Z_MSG_EXT_FLAG_M | _Z_MSG_EXT_ENC_ZBUF) +#define _Z_MSG_EXT_ID_JOIN_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT) +#define _Z_MSG_EXT_ID_INIT_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT) +#define _Z_MSG_EXT_ID_FRAGMENT_START (0x02 | _Z_MSG_EXT_ENC_UNIT) +#define _Z_MSG_EXT_ID_FRAGMENT_STOP (0x03 | _Z_MSG_EXT_ENC_UNIT) /*=============================*/ /* Extension Encodings */ @@ -58,6 +62,7 @@ extern "C" { #define _Z_MSG_EXT_FLAG_M 0x10 #define _Z_MSG_EXT_IS_MANDATORY(h) ((h & _Z_MSG_EXT_FLAG_M) != 0) #define _Z_MSG_EXT_FLAG_Z 0x80 +#define _Z_MSG_EXT_MORE(more) (more ? _Z_MSG_EXT_FLAG_Z : 0) typedef struct { uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior diff --git a/include/zenoh-pico/transport/common/tx.h b/include/zenoh-pico/transport/common/tx.h index 05d22a89e..a77fd1a28 100644 --- a/include/zenoh-pico/transport/common/tx.h +++ b/include/zenoh-pico/transport/common/tx.h @@ -27,7 +27,7 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); /*This function is unsafe because it operates in potentially concurrent data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */ -z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn); +z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, bool start); /*------------------ Transmission and Reception helpers ------------------*/ z_result_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg); diff --git a/include/zenoh-pico/transport/transport.h b/include/zenoh-pico/transport/transport.h index 1671786df..02c304ea6 100644 --- a/include/zenoh-pico/transport/transport.h +++ b/include/zenoh-pico/transport/transport.h @@ -47,6 +47,11 @@ typedef struct { uint16_t _peer_id; volatile bool _received; + +#if Z_FEATURE_FRAGMENTATION == 1 + // Patch + uint8_t _patch; +#endif } _z_transport_peer_entry_t; size_t _z_transport_peer_entry_size(const _z_transport_peer_entry_t *src); @@ -108,6 +113,11 @@ typedef struct { volatile bool _received; volatile bool _transmitted; + +#if Z_FEATURE_FRAGMENTATION == 1 + // Patch + uint8_t _patch; +#endif } _z_transport_unicast_t; typedef struct _z_transport_multicast_t { @@ -175,6 +185,9 @@ typedef struct { uint8_t _req_id_res; uint8_t _seq_num_res; bool _is_qos; +#if Z_FEATURE_FRAGMENTATION == 1 + uint8_t _patch; +#endif } _z_transport_unicast_establish_param_t; typedef struct { diff --git a/include/zenoh-pico/transport/utils.h b/include/zenoh-pico/transport/utils.h index 62fa319b4..25862370b 100644 --- a/include/zenoh-pico/transport/utils.h +++ b/include/zenoh-pico/transport/utils.h @@ -29,6 +29,7 @@ _z_zint_t _z_sn_max(uint8_t bits); _z_zint_t _z_sn_half(_z_zint_t sn); _z_zint_t _z_sn_modulo_mask(uint8_t bits); bool _z_sn_precedes(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right); +bool _z_sn_consecutive(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right); _z_zint_t _z_sn_increment(const _z_zint_t sn_resolution, const _z_zint_t sn); _z_zint_t _z_sn_decrement(const _z_zint_t sn_resolution, const _z_zint_t sn); diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 7eff8715b..7cd5e0d0a 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -64,9 +64,14 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t } _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._reliable)); _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._best_effort)); +#if Z_FEATURE_FRAGMENTATION == 1 + bool has_patch = msg->_patch != _Z_NO_PATCH; +#else + bool has_patch = false; +#endif if (msg->_next_sn._is_qos) { - if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)); + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_QOS | _Z_MSG_EXT_MORE(has_patch))); size_t len = 0; for (uint8_t i = 0; (i < Z_PRIORITIES_NUM) && (ret == _Z_RES_OK); i++) { len += _z_zint_len(msg->_next_sn._val._qos[i]._reliable) + @@ -82,6 +87,17 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; } } +#if Z_FEATURE_FRAGMENTATION == 1 + if (has_patch) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); + _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); + } else { + _Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + } + } +#endif return ret; } @@ -89,14 +105,17 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t z_result_t _z_join_decode_ext(_z_msg_ext_t *extension, void *ctx) { z_result_t ret = _Z_RES_OK; _z_t_msg_join_t *msg = (_z_t_msg_join_t *)ctx; - if (_Z_EXT_FULL_ID(extension->_header) == - (_Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)) { // QOS: (enc=zbuf)(mandatory=true)(id=1) + if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_QOS) { msg->_next_sn._is_qos = true; _z_zbuf_t zbf = _z_slice_as_zbuf(extension->_body._zbuf._val); for (int i = 0; (ret == _Z_RES_OK) && (i < Z_PRIORITIES_NUM); ++i) { ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._reliable, &zbf); ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._best_effort, &zbf); } +#if Z_FEATURE_FRAGMENTATION == 1 + } else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_PATCH) { + msg->_patch = (uint8_t)extension->_body._zint._val; +#endif } else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) { ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN; } @@ -147,7 +166,8 @@ z_result_t _z_join_decode(_z_t_msg_join_t *msg, _z_zbuf_t *zbf, uint8_t header) ret |= _z_zsize_decode(&msg->_next_sn._val._plain._reliable, zbf); ret |= _z_zsize_decode(&msg->_next_sn._val._plain._best_effort, zbf); } - if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { + msg->_patch = _Z_NO_PATCH; + if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { ret |= _z_msg_ext_decode_iter(zbf, _z_join_decode_ext, msg); } @@ -180,6 +200,32 @@ z_result_t _z_init_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_init_t _Z_RETURN_IF_ERR(_z_slice_encode(wbf, &msg->_cookie)) } + #if Z_FEATURE_FRAGMENTATION == 1 + if (msg->_patch != _Z_CURRENT_PATCH) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); + _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); + } else { + _Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + } + } + #endif + + return ret; +} + +z_result_t _z_init_decode_ext(_z_msg_ext_t *extension, void *ctx) { + z_result_t ret = _Z_RES_OK; + _z_t_msg_init_t *msg = (_z_t_msg_init_t *)ctx; + if (false) { +#if Z_FEATURE_FRAGMENTATION == 1 + } else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_INIT_PATCH) { + msg->_patch = (uint8_t)extension->_body._zint._val; +#endif + } else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) { + ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN; + } return ret; } @@ -222,8 +268,8 @@ z_result_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header) msg->_cookie = _z_slice_empty(); } - if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) { - ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x01); + if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + ret |= _z_msg_ext_decode_iter(zbf, _z_init_decode_ext, msg); } return ret; diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index 2af77cb97..0ef58c6bf 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -102,6 +102,9 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, msg._body._join._batch_size = Z_BATCH_MULTICAST_SIZE; msg._body._join._next_sn = next_sn; msg._body._join._zid = zid; +#if Z_FEATURE_FRAGMENTATION == 1 + msg._body._join._patch = _Z_CURRENT_PATCH; +#endif if ((lease % 1000) == 0) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_T); @@ -112,7 +115,12 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, _Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_S); } - if (next_sn._is_qos) { +#if Z_FEATURE_FRAGMENTATION == 1 + bool has_patch = msg._body._join._patch != _Z_NO_PATCH; +#else + bool has_patch = false; +#endif + if (next_sn._is_qos == true || has_patch == true) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); } @@ -131,6 +139,9 @@ _z_transport_message_t _z_t_msg_make_init_syn(z_whatami_t whatami, _z_id_t zid) msg._body._init._req_id_res = Z_REQ_RESOLUTION; msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE; _z_slice_reset(&msg._body._init._cookie); +#if Z_FEATURE_FRAGMENTATION == 1 + msg._body._init._patch = _Z_CURRENT_PATCH; +#endif if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) || (msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) || @@ -138,6 +149,15 @@ _z_transport_message_t _z_t_msg_make_init_syn(z_whatami_t whatami, _z_id_t zid) _Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S); } +#if Z_FEATURE_FRAGMENTATION == 1 + bool has_patch = msg._body._join._patch != _Z_NO_PATCH; +#else + bool has_patch = false; +#endif + if (has_patch == true) { + _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); + } + return msg; } @@ -153,6 +173,9 @@ _z_transport_message_t _z_t_msg_make_init_ack(z_whatami_t whatami, _z_id_t zid, msg._body._init._req_id_res = Z_REQ_RESOLUTION; msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE; msg._body._init._cookie = cookie; +#if Z_FEATURE_FRAGMENTATION == 1 + msg._body._init._patch = _Z_CURRENT_PATCH; +#endif if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) || (msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) || @@ -160,6 +183,15 @@ _z_transport_message_t _z_t_msg_make_init_ack(z_whatami_t whatami, _z_id_t zid, _Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S); } +#if Z_FEATURE_FRAGMENTATION == 1 + bool has_patch = msg._body._join._patch != _Z_NO_PATCH; +#else + bool has_patch = false; +#endif + if (has_patch == true) { + _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); + } + return msg; } @@ -247,11 +279,11 @@ _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t } /*------------------ Fragment Message ------------------*/ -_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last) { - return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last); +_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, bool start, bool stop) { + return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last, start, stop); } _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, z_reliability_t reliability, - bool is_last) { + bool is_last, bool start, bool stop) { _z_transport_message_t msg; msg._header = _Z_MID_T_FRAGMENT; if (is_last == false) { @@ -263,12 +295,20 @@ _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, msg._body._fragment._sn = sn; msg._body._fragment._payload = payload; + if (start == true || stop == true) { + _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); + } + msg._body._fragment.start = start; + msg._body._fragment.stop = stop; return msg; } void _z_t_msg_copy_fragment(_z_t_msg_fragment_t *clone, _z_t_msg_fragment_t *msg) { + clone->_payload = msg->_payload; _z_slice_copy(&clone->_payload, &msg->_payload); + clone->start = msg->start; + clone->stop = msg->stop; } void _z_t_msg_copy_join(_z_t_msg_join_t *clone, _z_t_msg_join_t *msg) { @@ -279,6 +319,7 @@ void _z_t_msg_copy_join(_z_t_msg_join_t *clone, _z_t_msg_join_t *msg) { clone->_req_id_res = msg->_req_id_res; clone->_batch_size = msg->_batch_size; clone->_next_sn = msg->_next_sn; + clone->_patch = msg->_patch; memcpy(clone->_zid.id, msg->_zid.id, 16); } diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 75a837cbe..4f2d66ce0 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -135,7 +135,7 @@ z_result_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t return ret; } -z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn) { +z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, bool start) { z_result_t ret = _Z_RES_OK; // Assume first that this is not the final fragment @@ -144,7 +144,7 @@ z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z size_t w_pos = _z_wbuf_get_wpos(dst); // Mark the buffer for the writing operation _z_transport_message_t f_hdr = - _z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final); + _z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final, start, false); ret = _z_transport_message_encode(dst, &f_hdr); // Encode the frame header if (ret == _Z_RES_OK) { size_t space_left = _z_wbuf_space_left(dst); diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index d531c6199..78ec6fe31 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -188,9 +188,54 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, } entry->_received = true; - _z_wbuf_t *dbuf = _Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R) - ? &entry->_dbuf_reliable - : &entry->_dbuf_best_effort; // Select the right defragmentation buffer + _z_wbuf_t *dbuf; + // Check if the SN is correct and select the right defragmentation buffer + if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R) == true) { + // @TODO: amend once reliability is in place. For the time being only + // monotonic SNs are ensured + if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._frame._sn) == true) { + bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._fragment._sn); + entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._frame._sn; + dbuf = &entry->_dbuf_reliable; + if (consecutive == false) { + _Z_DEBUG("Non-consecutive fragments received"); + _z_wbuf_reset(dbuf); + break; + } + } else { + _z_wbuf_clear(&entry->_dbuf_reliable); + _Z_INFO("Reliable message dropped because it is out of order"); + break; + } + } else { + if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._frame._sn) == true) { + bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._fragment._sn); + entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._frame._sn; + dbuf = &entry->_dbuf_best_effort; + if (consecutive == false) { + _Z_DEBUG("Non-consecutive fragments received"); + _z_wbuf_reset(dbuf); + break; + } + } else { + _z_wbuf_clear(&entry->_dbuf_best_effort); + _Z_INFO("Best effort message dropped because it is out of order"); + break; + } + } + // Handle fragment markers + if (_Z_PATCH_HAS_FRAGMENT_START_STOP(entry->_patch)) { + if (t_msg->_body._fragment.start == true) { + _z_wbuf_clear(dbuf); + } else if (_z_wbuf_len(dbuf) == 0) { + _Z_DEBUG("First fragment received without the start marker"); + break; + } + if (t_msg->_body._fragment.stop == true) { + _z_wbuf_clear(dbuf); + break; + } + } bool drop = false; if ((_z_wbuf_len(dbuf) + t_msg->_body._fragment._payload.len) > Z_FRAG_MAX_SIZE) { @@ -280,6 +325,7 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_conduit_sn_list_decrement(entry->_sn_res, &entry->_sn_rx_sns); #if Z_FEATURE_FRAGMENTATION == 1 + entry->_patch = t_msg->_body._join._patch < _Z_CURRENT_PATCH ? t_msg->_body._join._patch : _Z_CURRENT_PATCH; #if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 1 entry->_dbuf_reliable = _z_wbuf_make(0, true); entry->_dbuf_best_effort = _z_wbuf_make(0, true); diff --git a/src/transport/multicast/tx.c b/src/transport/multicast/tx.c index df5b3bcdc..bbb555edd 100644 --- a/src/transport/multicast/tx.c +++ b/src/transport/multicast/tx.c @@ -126,13 +126,12 @@ z_result_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t if (is_first == false) { // Get the fragment sequence number sn = __unsafe_z_multicast_get_sn(ztm, reliability); } - is_first = false; // Clear the buffer for serialization __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); // Serialize one fragment - ret = __unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn); + ret = __unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn, is_first); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); @@ -144,6 +143,7 @@ z_result_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t } else { _Z_ERROR("Fragment serialization failed with err %d", ret); } + is_first = false; } } // Clear the buffer as it's no longer required diff --git a/src/transport/peer_entry.c b/src/transport/peer_entry.c index 7d72604e3..b363b2eab 100644 --- a/src/transport/peer_entry.c +++ b/src/transport/peer_entry.c @@ -30,6 +30,8 @@ void _z_transport_peer_entry_copy(_z_transport_peer_entry_t *dst, const _z_trans #if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_copy(&dst->_dbuf_reliable, &src->_dbuf_reliable); _z_wbuf_copy(&dst->_dbuf_best_effort, &src->_dbuf_best_effort); + + dst->_patch = src->_patch; #endif dst->_sn_res = src->_sn_res; diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index b4ddc5def..538c52e1a 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -292,13 +292,12 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ // Get the fragment sequence number sn = __unsafe_z_raweth_get_sn(ztm, reliability); } - is_first = false; // Reset wbuf _z_wbuf_reset(&ztm->_wbuf); // Prepare buff __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); // Serialize one fragment - _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn), + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn, is_first), _zp_raweth_unlock_tx_mutex(ztm)); // Write the eth header _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf), @@ -307,6 +306,7 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ _Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm)); // Mark the session that we have transmitted data ztm->_transmitted = true; + is_first = false; } // Clear the expandable buffer _z_wbuf_clear(&fbf); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index 23c6859b9..8e6540cdc 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -141,10 +141,54 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t case _Z_MID_T_FRAGMENT: { _Z_INFO("Received Z_FRAGMENT message"); #if Z_FEATURE_FRAGMENTATION == 1 - _z_wbuf_t *dbuf = _Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R) - ? &ztu->_dbuf_reliable - : &ztu->_dbuf_best_effort; // Select the right defragmentation buffer - + _z_wbuf_t *dbuf; + // Check if the SN is correct and select the right defragmentation buffer + if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R) == true) { + // @TODO: amend once reliability is in place. For the time being only + // monotonic SNs are ensured + if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._frame._sn) == true) { + bool consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); + ztu->_sn_rx_reliable = t_msg->_body._frame._sn; + dbuf = &ztu->_dbuf_reliable; + if (consecutive == false) { + _Z_DEBUG("Non-consecutive fragments received"); + _z_wbuf_reset(dbuf); + break; + } + } else { + _z_wbuf_clear(&ztu->_dbuf_reliable); + _Z_INFO("Reliable message dropped because it is out of order"); + break; + } + } else { + if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._frame._sn) == true) { + bool consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); + ztu->_sn_rx_best_effort = t_msg->_body._frame._sn; + dbuf = &ztu->_dbuf_best_effort; + if (consecutive == false) { + _Z_DEBUG("Non-consecutive fragments received"); + _z_wbuf_reset(dbuf); + break; + } + } else { + _z_wbuf_clear(&ztu->_dbuf_best_effort); + _Z_INFO("Best effort message dropped because it is out of order"); + break; + } + } + // Handle fragment markers + if (_Z_PATCH_HAS_FRAGMENT_START_STOP(ztu->_patch)) { + if (t_msg->_body._fragment.start == true) { + _z_wbuf_clear(dbuf); + } else if (_z_wbuf_len(dbuf) == 0) { + _Z_DEBUG("First fragment received without the start marker"); + break; + } + if (t_msg->_body._fragment.stop == true) { + _z_wbuf_clear(dbuf); + break; + } + } bool drop = false; if ((_z_wbuf_len(dbuf) + t_msg->_body._fragment._payload.len) > Z_FRAG_MAX_SIZE) { // Filling the wbuf capacity as a way to signal the last fragment to reset the dbuf diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index 01c19ce91..c7ef12aca 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -36,6 +36,10 @@ z_result_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, z_result_t ret = _Z_RES_OK; zt->_type = _Z_TRANSPORT_UNICAST_TYPE; +#if Z_FEATURE_FRAGMENTATION == 1 + // Patch + zt->_transport._unicast._patch = param->_patch; +#endif #if Z_FEATURE_MULTI_THREAD == 1 // Initialize the mutexes @@ -199,6 +203,13 @@ z_result_t _z_unicast_open_client(_z_transport_unicast_establish_param_t *param, } else { ret = _Z_ERR_TRANSPORT_OPEN_SN_RESOLUTION; } +#if Z_FEATURE_FRAGMENTATION == 1 + if (iam._body._init._patch > ism._body._init._patch) { + // TODO: Use a better error code? + ret = _Z_ERR_GENERIC; + } + param->_patch = iam._body._init._patch; +#endif if (ret == _Z_RES_OK) { param->_key_id_res = 0x08 << param->_key_id_res; diff --git a/src/transport/unicast/tx.c b/src/transport/unicast/tx.c index 5ed555794..561c232c7 100644 --- a/src/transport/unicast/tx.c +++ b/src/transport/unicast/tx.c @@ -135,13 +135,12 @@ z_result_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n if (is_first == false) { // Get the fragment sequence number sn = __unsafe_z_unicast_get_sn(ztu, reliability); } - is_first = false; // Clear the buffer for serialization __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); // Serialize one fragment - ret = __unsafe_z_serialize_zenoh_fragment(&ztu->_wbuf, &fbf, reliability, sn); + ret = __unsafe_z_serialize_zenoh_fragment(&ztu->_wbuf, &fbf, reliability, sn, is_first); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); @@ -153,6 +152,7 @@ z_result_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n } else { _Z_ERROR("Fragment serialization failed with err %d", ret); } + is_first = false; } } diff --git a/src/transport/utils.c b/src/transport/utils.c index 510a0f19c..dc0697043 100644 --- a/src/transport/utils.c +++ b/src/transport/utils.c @@ -82,6 +82,11 @@ bool _z_sn_precedes(const _z_zint_t sn_resolution, const _z_zint_t sn_left, cons return ((distance <= _z_sn_half(sn_resolution)) && (distance != 0)); } +bool _z_sn_consecutive(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right) { + _z_zint_t distance = (sn_right - sn_left) & sn_resolution; + return distance == 1; +} + _z_zint_t _z_sn_increment(const _z_zint_t sn_resolution, const _z_zint_t sn) { _z_zint_t ret = sn + 1; return (ret &= sn_resolution); From 6837602aba15c8ed20ab3a2ba3e3d05b27876be8 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 08:21:53 +0100 Subject: [PATCH 02/32] fix: fix test --- tests/z_msgcodec_test.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/z_msgcodec_test.c b/tests/z_msgcodec_test.c index d4bbdd2c4..1c227d194 100644 --- a/tests/z_msgcodec_test.c +++ b/tests/z_msgcodec_test.c @@ -1825,7 +1825,7 @@ void frame_message(void) { } _z_transport_message_t gen_fragment(void) { - return _z_t_msg_make_fragment(gen_uint32(), gen_slice(gen_uint8()), gen_bool(), gen_bool()); + return _z_t_msg_make_fragment(gen_uint32(), gen_slice(gen_uint8()), gen_bool(), gen_bool(), gen_bool(), gen_bool()); } void assert_eq_fragment(const _z_t_msg_fragment_t *left, const _z_t_msg_fragment_t *right) { assert(left->_sn == right->_sn); From ad941900cac2c4fa63421ff472b1057760c12097 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 08:27:32 +0100 Subject: [PATCH 03/32] fix: typo --- src/protocol/definitions/transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index 0ef58c6bf..08bb6e9d3 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -184,7 +184,7 @@ _z_transport_message_t _z_t_msg_make_init_ack(z_whatami_t whatami, _z_id_t zid, } #if Z_FEATURE_FRAGMENTATION == 1 - bool has_patch = msg._body._join._patch != _Z_NO_PATCH; + bool has_patch = msg._body._init._patch != _Z_NO_PATCH; #else bool has_patch = false; #endif From c5f50aca2bda1c93c6b75c5574c1743bd3173533 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 08:45:22 +0100 Subject: [PATCH 04/32] fix: format --- .../protocol/definitions/transport.h | 3 ++- include/zenoh-pico/transport/common/tx.h | 3 ++- src/protocol/codec/transport.c | 20 +++++++++---------- src/protocol/definitions/transport.c | 3 ++- src/transport/common/tx.c | 3 ++- src/transport/multicast/rx.c | 15 +++++++++----- src/transport/unicast/rx.c | 6 ++++-- 7 files changed, 32 insertions(+), 21 deletions(-) diff --git a/include/zenoh-pico/protocol/definitions/transport.h b/include/zenoh-pico/protocol/definitions/transport.h index d5ef7d295..346c6eda9 100644 --- a/include/zenoh-pico/protocol/definitions/transport.h +++ b/include/zenoh-pico/protocol/definitions/transport.h @@ -530,7 +530,8 @@ _z_transport_message_t _z_t_msg_make_keep_alive(void); _z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_vec_t messages, z_reliability_t reliability); _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t reliability); -_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, bool start, bool stop); +_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, + bool start, bool stop); _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t messages, z_reliability_t reliability, bool is_last, bool start, bool stop); diff --git a/include/zenoh-pico/transport/common/tx.h b/include/zenoh-pico/transport/common/tx.h index a77fd1a28..9f344b1e6 100644 --- a/include/zenoh-pico/transport/common/tx.h +++ b/include/zenoh-pico/transport/common/tx.h @@ -27,7 +27,8 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); /*This function is unsafe because it operates in potentially concurrent data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */ -z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, bool start); +z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, + bool start); /*------------------ Transmission and Reception helpers ------------------*/ z_result_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg); diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 7cd5e0d0a..575e2f356 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -200,17 +200,17 @@ z_result_t _z_init_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_init_t _Z_RETURN_IF_ERR(_z_slice_encode(wbf, &msg->_cookie)) } - #if Z_FEATURE_FRAGMENTATION == 1 - if (msg->_patch != _Z_CURRENT_PATCH) { - if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); - _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); - } else { - _Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset"); - ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; - } +#if Z_FEATURE_FRAGMENTATION == 1 + if (msg->_patch != _Z_CURRENT_PATCH) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); + _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); + } else { + _Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; } - #endif + } +#endif return ret; } diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index 08bb6e9d3..fc458a755 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -279,7 +279,8 @@ _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t } /*------------------ Fragment Message ------------------*/ -_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, bool start, bool stop) { +_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, + bool start, bool stop) { return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last, start, stop); } _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, z_reliability_t reliability, diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 4f2d66ce0..c333fdb12 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -135,7 +135,8 @@ z_result_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t return ret; } -z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, bool start) { +z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, + bool start) { z_result_t ret = _Z_RES_OK; // Assume first that this is not the final fragment diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 78ec6fe31..7b3f0ace7 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -193,8 +193,10 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R) == true) { // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured - if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._frame._sn) == true) { - bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._fragment._sn); + if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._frame._sn) == + true) { + bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, + t_msg->_body._fragment._sn); entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._frame._sn; dbuf = &entry->_dbuf_reliable; if (consecutive == false) { @@ -208,8 +210,10 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, break; } } else { - if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._frame._sn) == true) { - bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._fragment._sn); + if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, + t_msg->_body._frame._sn) == true) { + bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, + t_msg->_body._fragment._sn); entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._frame._sn; dbuf = &entry->_dbuf_best_effort; if (consecutive == false) { @@ -325,7 +329,8 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_conduit_sn_list_decrement(entry->_sn_res, &entry->_sn_rx_sns); #if Z_FEATURE_FRAGMENTATION == 1 - entry->_patch = t_msg->_body._join._patch < _Z_CURRENT_PATCH ? t_msg->_body._join._patch : _Z_CURRENT_PATCH; + entry->_patch = + t_msg->_body._join._patch < _Z_CURRENT_PATCH ? t_msg->_body._join._patch : _Z_CURRENT_PATCH; #if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 1 entry->_dbuf_reliable = _z_wbuf_make(0, true); entry->_dbuf_best_effort = _z_wbuf_make(0, true); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index 8e6540cdc..b5b0e8bc3 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -147,7 +147,8 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._frame._sn) == true) { - bool consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); + bool consecutive = + _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); ztu->_sn_rx_reliable = t_msg->_body._frame._sn; dbuf = &ztu->_dbuf_reliable; if (consecutive == false) { @@ -162,7 +163,8 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t } } else { if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._frame._sn) == true) { - bool consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); + bool consecutive = + _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); ztu->_sn_rx_best_effort = t_msg->_body._frame._sn; dbuf = &ztu->_dbuf_best_effort; if (consecutive == false) { From 3736054bc349d43add513a7d8ba09744e0054645 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 09:57:01 +0100 Subject: [PATCH 05/32] fix: fragment ext encoding --- src/protocol/codec/transport.c | 38 +++++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 575e2f356..758b2dc8f 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -201,7 +201,7 @@ z_result_t _z_init_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_init_t } #if Z_FEATURE_FRAGMENTATION == 1 - if (msg->_patch != _Z_CURRENT_PATCH) { + if (msg->_patch != _Z_NO_PATCH) { if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); @@ -434,16 +434,42 @@ z_result_t _z_fragment_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_fra z_result_t ret = _Z_RES_OK; _Z_DEBUG("Encoding _Z_TRANSPORT_FRAGMENT"); _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_sn)) - if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { - ret = _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + if (msg->start == true) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_FRAGMENT_START | _Z_MSG_EXT_MORE(msg->stop))); + } else { + _Z_DEBUG("Attempted to serialize Start extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + } + } + if (msg->stop == true) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_FRAGMENT_STOP)); + } else { + _Z_DEBUG("Attempted to serialize Stop extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + } } - if (ret == _Z_RES_OK && _z_slice_check(&msg->_payload)) { + if (_z_slice_check(&msg->_payload)) { _Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, msg->_payload.start, 0, msg->_payload.len)); } return ret; } +z_result_t _z_fragment_decode_ext(_z_msg_ext_t *extension, void *ctx) { + z_result_t ret = _Z_RES_OK; + _z_t_msg_fragment_t *msg = (_z_t_msg_fragment_t *)ctx; + if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_FRAGMENT_START) { + msg->start = true; + } else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_FRAGMENT_STOP) { + msg->stop = true; + } else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) { + ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN; + } + return ret; +} + z_result_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t header) { z_result_t ret = _Z_RES_OK; *msg = (_z_t_msg_fragment_t){0}; @@ -451,8 +477,10 @@ z_result_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t _Z_DEBUG("Decoding _Z_TRANSPORT_FRAGMENT"); ret |= _z_zsize_decode(&msg->_sn, zbf); + msg->start = false; + msg->stop = false; if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) { - ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x05); + ret |= _z_msg_ext_decode_iter(zbf, _z_fragment_decode_ext, msg); } _z_slice_t slice = _z_slice_alias_buf((uint8_t *)_z_zbuf_start(zbf), _z_zbuf_len(zbf)); From 218019a4ed1acf4e4f1d45c50d5dcff3f5752e42 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 10:01:57 +0100 Subject: [PATCH 06/32] fix: rename start/stop marker to first/drop --- .../protocol/definitions/transport.h | 10 +++++----- include/zenoh-pico/protocol/ext.h | 4 ++-- include/zenoh-pico/transport/common/tx.h | 2 +- src/protocol/codec/transport.c | 20 +++++++++---------- src/protocol/definitions/transport.c | 16 +++++++-------- src/transport/common/tx.c | 4 ++-- src/transport/multicast/rx.c | 8 ++++---- src/transport/unicast/rx.c | 6 +++--- 8 files changed, 35 insertions(+), 35 deletions(-) diff --git a/include/zenoh-pico/protocol/definitions/transport.h b/include/zenoh-pico/protocol/definitions/transport.h index 346c6eda9..1320de7d3 100644 --- a/include/zenoh-pico/protocol/definitions/transport.h +++ b/include/zenoh-pico/protocol/definitions/transport.h @@ -99,7 +99,7 @@ extern "C" { /// if >= 1, then fragmentation start/stop marker #define _Z_NO_PATCH 0x00 #define _Z_CURRENT_PATCH 0x01 -#define _Z_PATCH_HAS_FRAGMENT_START_STOP(patch) (patch >= 1) +#define _Z_PATCH_HAS_FRAGMENT_MARKERS(patch) (patch >= 1) /*=============================*/ /* Transport Messages */ @@ -494,8 +494,8 @@ void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg); typedef struct { _z_slice_t _payload; _z_zint_t _sn; - bool start; - bool stop; + bool first; + bool drop; } _z_t_msg_fragment_t; void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg); @@ -531,9 +531,9 @@ _z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_vec_ z_reliability_t reliability); _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t reliability); _z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, - bool start, bool stop); + bool first, bool drop); _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t messages, z_reliability_t reliability, - bool is_last, bool start, bool stop); + bool is_last, bool first, bool drop); /*------------------ Copy ------------------*/ void _z_t_msg_copy(_z_transport_message_t *clone, _z_transport_message_t *msg); diff --git a/include/zenoh-pico/protocol/ext.h b/include/zenoh-pico/protocol/ext.h index a8f961ee4..a8afd0029 100644 --- a/include/zenoh-pico/protocol/ext.h +++ b/include/zenoh-pico/protocol/ext.h @@ -46,8 +46,8 @@ extern "C" { #define _Z_MSG_EXT_ID_JOIN_QOS (0x01 | _Z_MSG_EXT_FLAG_M | _Z_MSG_EXT_ENC_ZBUF) #define _Z_MSG_EXT_ID_JOIN_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT) #define _Z_MSG_EXT_ID_INIT_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT) -#define _Z_MSG_EXT_ID_FRAGMENT_START (0x02 | _Z_MSG_EXT_ENC_UNIT) -#define _Z_MSG_EXT_ID_FRAGMENT_STOP (0x03 | _Z_MSG_EXT_ENC_UNIT) +#define _Z_MSG_EXT_ID_FRAGMENT_FIRST (0x02 | _Z_MSG_EXT_ENC_UNIT) +#define _Z_MSG_EXT_ID_FRAGMENT_DROP (0x03 | _Z_MSG_EXT_ENC_UNIT) /*=============================*/ /* Extension Encodings */ diff --git a/include/zenoh-pico/transport/common/tx.h b/include/zenoh-pico/transport/common/tx.h index 9f344b1e6..e70c8fc9e 100644 --- a/include/zenoh-pico/transport/common/tx.h +++ b/include/zenoh-pico/transport/common/tx.h @@ -28,7 +28,7 @@ void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); /*This function is unsafe because it operates in potentially concurrent data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */ z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, - bool start); + bool first); /*------------------ Transmission and Reception helpers ------------------*/ z_result_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg); diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 758b2dc8f..75f400765 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -434,17 +434,17 @@ z_result_t _z_fragment_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_fra z_result_t ret = _Z_RES_OK; _Z_DEBUG("Encoding _Z_TRANSPORT_FRAGMENT"); _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_sn)) - if (msg->start == true) { + if (msg->first == true) { if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_FRAGMENT_START | _Z_MSG_EXT_MORE(msg->stop))); + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_FRAGMENT_FIRST | _Z_MSG_EXT_MORE(msg->drop))); } else { _Z_DEBUG("Attempted to serialize Start extension, but the header extension flag was unset"); ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; } } - if (msg->stop == true) { + if (msg->drop == true) { if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_FRAGMENT_STOP)); + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_FRAGMENT_DROP)); } else { _Z_DEBUG("Attempted to serialize Stop extension, but the header extension flag was unset"); ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; @@ -460,10 +460,10 @@ z_result_t _z_fragment_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_fra z_result_t _z_fragment_decode_ext(_z_msg_ext_t *extension, void *ctx) { z_result_t ret = _Z_RES_OK; _z_t_msg_fragment_t *msg = (_z_t_msg_fragment_t *)ctx; - if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_FRAGMENT_START) { - msg->start = true; - } else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_FRAGMENT_STOP) { - msg->stop = true; + if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_FRAGMENT_FIRST) { + msg->first = true; + } else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_FRAGMENT_DROP) { + msg->drop = true; } else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) { ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN; } @@ -477,8 +477,8 @@ z_result_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t _Z_DEBUG("Decoding _Z_TRANSPORT_FRAGMENT"); ret |= _z_zsize_decode(&msg->_sn, zbf); - msg->start = false; - msg->stop = false; + msg->first = false; + msg->drop = false; if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) { ret |= _z_msg_ext_decode_iter(zbf, _z_fragment_decode_ext, msg); } diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index fc458a755..aae4fc6c2 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -280,11 +280,11 @@ _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t /*------------------ Fragment Message ------------------*/ _z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, - bool start, bool stop) { - return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last, start, stop); + bool first, bool drop) { + return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last, first, drop); } _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, z_reliability_t reliability, - bool is_last, bool start, bool stop) { + bool is_last, bool first, bool drop) { _z_transport_message_t msg; msg._header = _Z_MID_T_FRAGMENT; if (is_last == false) { @@ -296,11 +296,11 @@ _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, msg._body._fragment._sn = sn; msg._body._fragment._payload = payload; - if (start == true || stop == true) { + if (first == true || drop == true) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); } - msg._body._fragment.start = start; - msg._body._fragment.stop = stop; + msg._body._fragment.first = first; + msg._body._fragment.drop = drop; return msg; } @@ -308,8 +308,8 @@ _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, void _z_t_msg_copy_fragment(_z_t_msg_fragment_t *clone, _z_t_msg_fragment_t *msg) { clone->_payload = msg->_payload; _z_slice_copy(&clone->_payload, &msg->_payload); - clone->start = msg->start; - clone->stop = msg->stop; + clone->first = msg->first; + clone->drop = msg->drop; } void _z_t_msg_copy_join(_z_t_msg_join_t *clone, _z_t_msg_join_t *msg) { diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index c333fdb12..6ab0af73e 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -136,7 +136,7 @@ z_result_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t } z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, - bool start) { + bool first) { z_result_t ret = _Z_RES_OK; // Assume first that this is not the final fragment @@ -145,7 +145,7 @@ z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z size_t w_pos = _z_wbuf_get_wpos(dst); // Mark the buffer for the writing operation _z_transport_message_t f_hdr = - _z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final, start, false); + _z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final, first, false); ret = _z_transport_message_encode(dst, &f_hdr); // Encode the frame header if (ret == _Z_RES_OK) { size_t space_left = _z_wbuf_space_left(dst); diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 7b3f0ace7..65f2c67f6 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -228,14 +228,14 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, } } // Handle fragment markers - if (_Z_PATCH_HAS_FRAGMENT_START_STOP(entry->_patch)) { - if (t_msg->_body._fragment.start == true) { + if (_Z_PATCH_HAS_FRAGMENT_MARKERS(entry->_patch)) { + if (t_msg->_body._fragment.first == true) { _z_wbuf_clear(dbuf); } else if (_z_wbuf_len(dbuf) == 0) { - _Z_DEBUG("First fragment received without the start marker"); + _Z_DEBUG("First fragment received without the first marker"); break; } - if (t_msg->_body._fragment.stop == true) { + if (t_msg->_body._fragment.drop == true) { _z_wbuf_clear(dbuf); break; } diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index b5b0e8bc3..dc5203478 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -179,14 +179,14 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t } } // Handle fragment markers - if (_Z_PATCH_HAS_FRAGMENT_START_STOP(ztu->_patch)) { - if (t_msg->_body._fragment.start == true) { + if (_Z_PATCH_HAS_FRAGMENT_MARKERS(ztu->_patch)) { + if (t_msg->_body._fragment.first == true) { _z_wbuf_clear(dbuf); } else if (_z_wbuf_len(dbuf) == 0) { _Z_DEBUG("First fragment received without the start marker"); break; } - if (t_msg->_body._fragment.stop == true) { + if (t_msg->_body._fragment.drop == true) { _z_wbuf_clear(dbuf); break; } From 0c5f62b3648b04e880ff62e08d454ce3e0f58037 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 10:17:38 +0100 Subject: [PATCH 07/32] Update src/protocol/codec/transport.c Co-authored-by: Alexander Bushnev --- src/protocol/codec/transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 75f400765..20c99b2cd 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -70,7 +70,7 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t bool has_patch = false; #endif if (msg->_next_sn._is_qos) { - if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_QOS | _Z_MSG_EXT_MORE(has_patch))); size_t len = 0; for (uint8_t i = 0; (i < Z_PRIORITIES_NUM) && (ret == _Z_RES_OK); i++) { From 0f2a1393c69e3efa0c1ca97adc82787a0d93fab3 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:07:18 +0100 Subject: [PATCH 08/32] Update src/protocol/definitions/transport.c Co-authored-by: Alexander Bushnev --- src/protocol/definitions/transport.c | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index aae4fc6c2..f98765d6a 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -184,14 +184,10 @@ _z_transport_message_t _z_t_msg_make_init_ack(z_whatami_t whatami, _z_id_t zid, } #if Z_FEATURE_FRAGMENTATION == 1 - bool has_patch = msg._body._init._patch != _Z_NO_PATCH; -#else - bool has_patch = false; -#endif - if (has_patch == true) { + if (msg._body._init._patch != _Z_NO_PATCH) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); } - +#endif return msg; } From c93e6941844b0bcbeaa41347ad0fe403f63fda14 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:07:24 +0100 Subject: [PATCH 09/32] Update src/protocol/definitions/transport.c Co-authored-by: Alexander Bushnev --- src/protocol/definitions/transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index f98765d6a..731bf7159 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -154,7 +154,7 @@ _z_transport_message_t _z_t_msg_make_init_syn(z_whatami_t whatami, _z_id_t zid) #else bool has_patch = false; #endif - if (has_patch == true) { + if (has_patch) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); } From 8c5551155f6ce65b247648ca7ad3a7b8c5776906 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:07:30 +0100 Subject: [PATCH 10/32] Update src/protocol/codec/transport.c Co-authored-by: Alexander Bushnev --- src/protocol/codec/transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 20c99b2cd..d1609bcd2 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -268,7 +268,7 @@ z_result_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header) msg->_cookie = _z_slice_empty(); } - if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { ret |= _z_msg_ext_decode_iter(zbf, _z_init_decode_ext, msg); } From e997fc468c9ab960d63edfeabea5e55dfd1268a8 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:07:37 +0100 Subject: [PATCH 11/32] Update src/transport/unicast/rx.c Co-authored-by: Alexander Bushnev --- src/transport/unicast/rx.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index dc5203478..aa26b121d 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -146,7 +146,7 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R) == true) { // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured - if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._frame._sn) == true) { + if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._frame._sn)) { bool consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); ztu->_sn_rx_reliable = t_msg->_body._frame._sn; From b42dca26c2ff992055ce53bb237a0c1a7f721141 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:07:43 +0100 Subject: [PATCH 12/32] Update src/transport/unicast/rx.c Co-authored-by: Alexander Bushnev --- src/transport/unicast/rx.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index aa26b121d..1139ab67f 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -162,7 +162,7 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t break; } } else { - if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._frame._sn) == true) { + if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._frame._sn)) { bool consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); ztu->_sn_rx_best_effort = t_msg->_body._frame._sn; From 223d085dc3c57592b4359add1c8752ba028be8d1 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:08:00 +0100 Subject: [PATCH 13/32] Update src/transport/multicast/rx.c Co-authored-by: Alexander Bushnev --- src/transport/multicast/rx.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 65f2c67f6..48cc951bb 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -190,7 +190,7 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_wbuf_t *dbuf; // Check if the SN is correct and select the right defragmentation buffer - if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R) == true) { + if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R)) { // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._frame._sn) == From a05524dfd3de2d243e1509dfc5c8d98dd9e56728 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:08:25 +0100 Subject: [PATCH 14/32] Update src/protocol/definitions/transport.c Co-authored-by: Alexander Bushnev --- src/protocol/definitions/transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index 731bf7159..6332cff48 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -120,7 +120,7 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, #else bool has_patch = false; #endif - if (next_sn._is_qos == true || has_patch == true) { + if (next_sn._is_qos || has_patch) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); } From 107ac1a9cee73d25587361aa07274c0c2268c670 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:10:00 +0100 Subject: [PATCH 15/32] Update src/protocol/definitions/transport.c Co-authored-by: Alexander Bushnev From bd89a7662783912c890f81a73e4aa8565adaed52 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:12:47 +0100 Subject: [PATCH 16/32] Update src/protocol/codec/transport.c Co-authored-by: Alexander Bushnev --- src/protocol/codec/transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index d1609bcd2..096149cd2 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -202,7 +202,7 @@ z_result_t _z_init_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_init_t #if Z_FEATURE_FRAGMENTATION == 1 if (msg->_patch != _Z_NO_PATCH) { - if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); } else { From 645acc467a289e9491f1141d25100036e2b16b28 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:12:52 +0100 Subject: [PATCH 17/32] Update src/protocol/codec/transport.c Co-authored-by: Alexander Bushnev --- src/protocol/codec/transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 096149cd2..1c132861b 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -167,7 +167,7 @@ z_result_t _z_join_decode(_z_t_msg_join_t *msg, _z_zbuf_t *zbf, uint8_t header) ret |= _z_zsize_decode(&msg->_next_sn._val._plain._best_effort, zbf); } msg->_patch = _Z_NO_PATCH; - if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { ret |= _z_msg_ext_decode_iter(zbf, _z_join_decode_ext, msg); } From 002a0e1b23fecfa9b566afdea8d79cf19ba78535 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:12:58 +0100 Subject: [PATCH 18/32] Update src/protocol/codec/transport.c Co-authored-by: Alexander Bushnev --- src/protocol/codec/transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 1c132861b..62a15d6f0 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -89,7 +89,7 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t } #if Z_FEATURE_FRAGMENTATION == 1 if (has_patch) { - if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); } else { From 47d15e1a6ad7b1d1bad1a5ef8f0225dd3b99c02b Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:13:12 +0100 Subject: [PATCH 19/32] Update src/transport/multicast/rx.c Co-authored-by: Alexander Bushnev --- src/transport/multicast/rx.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 48cc951bb..dd1f9257e 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -211,7 +211,7 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, } } else { if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, - t_msg->_body._frame._sn) == true) { + t_msg->_body._frame._sn)) { bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._fragment._sn); entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._frame._sn; From 524b3dfbda094256de799b2148dc7ba1fbc7998c Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:13:25 +0100 Subject: [PATCH 20/32] Update src/transport/multicast/rx.c Co-authored-by: Alexander Bushnev --- src/transport/multicast/rx.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index dd1f9257e..aa64087ee 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -199,7 +199,7 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, t_msg->_body._fragment._sn); entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._frame._sn; dbuf = &entry->_dbuf_reliable; - if (consecutive == false) { + if (!consecutive) { _Z_DEBUG("Non-consecutive fragments received"); _z_wbuf_reset(dbuf); break; From daa56a0952ebdcee8fd64c5fbf4ebc09939fe326 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:13:51 +0100 Subject: [PATCH 21/32] Update src/transport/multicast/rx.c Co-authored-by: Alexander Bushnev --- src/transport/multicast/rx.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index aa64087ee..0e03a1e03 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -216,7 +216,7 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, t_msg->_body._fragment._sn); entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._frame._sn; dbuf = &entry->_dbuf_best_effort; - if (consecutive == false) { + if (!consecutive) { _Z_DEBUG("Non-consecutive fragments received"); _z_wbuf_reset(dbuf); break; From 1ba505b17e6a4f08251f70d2bd459e69378355f5 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:14:23 +0100 Subject: [PATCH 22/32] Update src/transport/unicast/rx.c Co-authored-by: Alexander Bushnev --- src/transport/unicast/rx.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index 1139ab67f..e3aaba59f 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -143,7 +143,7 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t #if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_t *dbuf; // Check if the SN is correct and select the right defragmentation buffer - if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R) == true) { + if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R)) { // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._frame._sn)) { From 1b681bb03689bee095494bedfa25d4dc2000b450 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:14:38 +0100 Subject: [PATCH 23/32] Update src/transport/unicast/rx.c Co-authored-by: Alexander Bushnev --- src/transport/unicast/rx.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index e3aaba59f..b0d82abf9 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -151,7 +151,7 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); ztu->_sn_rx_reliable = t_msg->_body._frame._sn; dbuf = &ztu->_dbuf_reliable; - if (consecutive == false) { + if (!consecutive) { _Z_DEBUG("Non-consecutive fragments received"); _z_wbuf_reset(dbuf); break; From 549064501cebc955cb3e48af68cd6869c873efc0 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:15:04 +0100 Subject: [PATCH 24/32] Update src/transport/unicast/rx.c Co-authored-by: Alexander Bushnev --- src/transport/unicast/rx.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index b0d82abf9..ad5b026e0 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -167,7 +167,7 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); ztu->_sn_rx_best_effort = t_msg->_body._frame._sn; dbuf = &ztu->_dbuf_best_effort; - if (consecutive == false) { + if (!consecutive) { _Z_DEBUG("Non-consecutive fragments received"); _z_wbuf_reset(dbuf); break; From 1b36f8e3dba38dec413d19ad909e687ce3254ed4 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 11:19:10 +0100 Subject: [PATCH 25/32] fix: apply PR feedbacks --- src/protocol/codec/transport.c | 4 ++-- src/protocol/definitions/transport.c | 2 +- src/transport/multicast/rx.c | 4 ++-- src/transport/unicast/rx.c | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 62a15d6f0..0cff46a78 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -434,7 +434,7 @@ z_result_t _z_fragment_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_fra z_result_t ret = _Z_RES_OK; _Z_DEBUG("Encoding _Z_TRANSPORT_FRAGMENT"); _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_sn)) - if (msg->first == true) { + if (msg->first) { if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_FRAGMENT_FIRST | _Z_MSG_EXT_MORE(msg->drop))); } else { @@ -442,7 +442,7 @@ z_result_t _z_fragment_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_fra ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; } } - if (msg->drop == true) { + if (msg->drop) { if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_FRAGMENT_DROP)); } else { diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index 6332cff48..a332626e6 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -292,7 +292,7 @@ _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, msg._body._fragment._sn = sn; msg._body._fragment._payload = payload; - if (first == true || drop == true) { + if (first || drop) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); } msg._body._fragment.first = first; diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 0e03a1e03..5a099558c 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -229,13 +229,13 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, } // Handle fragment markers if (_Z_PATCH_HAS_FRAGMENT_MARKERS(entry->_patch)) { - if (t_msg->_body._fragment.first == true) { + if (t_msg->_body._fragment.first) { _z_wbuf_clear(dbuf); } else if (_z_wbuf_len(dbuf) == 0) { _Z_DEBUG("First fragment received without the first marker"); break; } - if (t_msg->_body._fragment.drop == true) { + if (t_msg->_body._fragment.drop) { _z_wbuf_clear(dbuf); break; } diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index ad5b026e0..b737e7fa7 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -180,13 +180,13 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t } // Handle fragment markers if (_Z_PATCH_HAS_FRAGMENT_MARKERS(ztu->_patch)) { - if (t_msg->_body._fragment.first == true) { + if (t_msg->_body._fragment.first) { _z_wbuf_clear(dbuf); } else if (_z_wbuf_len(dbuf) == 0) { _Z_DEBUG("First fragment received without the start marker"); break; } - if (t_msg->_body._fragment.drop == true) { + if (t_msg->_body._fragment.drop) { _z_wbuf_clear(dbuf); break; } From bca36bd80989d40ab13dc011f5658f9b160e066a Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 13:06:30 +0100 Subject: [PATCH 26/32] fix: apply PR feedbacks --- src/protocol/definitions/transport.c | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index a332626e6..2b463190b 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -150,13 +150,10 @@ _z_transport_message_t _z_t_msg_make_init_syn(z_whatami_t whatami, _z_id_t zid) } #if Z_FEATURE_FRAGMENTATION == 1 - bool has_patch = msg._body._join._patch != _Z_NO_PATCH; -#else - bool has_patch = false; -#endif - if (has_patch) { + if (msg._body._init._patch != _Z_NO_PATCH) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); } +#endif return msg; } From 733a23db72cf459d16349c7b907b7d0526622ec9 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 16:14:42 +0100 Subject: [PATCH 27/32] fix: initializing all the fields is always a good idea --- src/protocol/codec/transport.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 0cff46a78..bdc3d17f6 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -268,6 +268,7 @@ z_result_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header) msg->_cookie = _z_slice_empty(); } + msg->_patch = _Z_NO_PATCH; if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { ret |= _z_msg_ext_decode_iter(zbf, _z_init_decode_ext, msg); } From e73ce420cd1e9029f3bfb5a3d09943b102ea45ba Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 17:45:14 +0100 Subject: [PATCH 28/32] fix: add _patch field in copy functions --- src/protocol/definitions/transport.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index 2b463190b..486aa43fd 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -313,7 +313,9 @@ void _z_t_msg_copy_join(_z_t_msg_join_t *clone, _z_t_msg_join_t *msg) { clone->_req_id_res = msg->_req_id_res; clone->_batch_size = msg->_batch_size; clone->_next_sn = msg->_next_sn; +#if Z_FEATURE_FRAGMENTATION == 1 clone->_patch = msg->_patch; +#endif memcpy(clone->_zid.id, msg->_zid.id, 16); } @@ -325,6 +327,9 @@ void _z_t_msg_copy_init(_z_t_msg_init_t *clone, _z_t_msg_init_t *msg) { clone->_batch_size = msg->_batch_size; memcpy(clone->_zid.id, msg->_zid.id, 16); _z_slice_copy(&clone->_cookie, &msg->_cookie); +#if Z_FEATURE_FRAGMENTATION == 1 + clone->_patch = msg->_patch; +#endif } void _z_t_msg_copy_open(_z_t_msg_open_t *clone, _z_t_msg_open_t *msg) { From e55c499b6cf12fe9b417acc2df92f31360e11fbb Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 18:32:46 +0100 Subject: [PATCH 29/32] fix: typo --- src/transport/multicast/rx.c | 8 ++++---- src/transport/unicast/rx.c | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 5a099558c..ce595aaf8 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -193,11 +193,11 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R)) { // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured - if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._frame._sn) == + if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._fragment._sn) == true) { bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._fragment._sn); - entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._frame._sn; + entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._fragment._sn; dbuf = &entry->_dbuf_reliable; if (!consecutive) { _Z_DEBUG("Non-consecutive fragments received"); @@ -211,10 +211,10 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, } } else { if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, - t_msg->_body._frame._sn)) { + t_msg->_body._fragment._sn)) { bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._fragment._sn); - entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._frame._sn; + entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._fragment._sn; dbuf = &entry->_dbuf_best_effort; if (!consecutive) { _Z_DEBUG("Non-consecutive fragments received"); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index b737e7fa7..c0b1fca42 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -146,10 +146,10 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R)) { // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured - if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._frame._sn)) { + if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn)) { bool consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); - ztu->_sn_rx_reliable = t_msg->_body._frame._sn; + ztu->_sn_rx_reliable = t_msg->_body._fragment._sn; dbuf = &ztu->_dbuf_reliable; if (!consecutive) { _Z_DEBUG("Non-consecutive fragments received"); @@ -162,10 +162,10 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t break; } } else { - if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._frame._sn)) { + if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn)) { bool consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); - ztu->_sn_rx_best_effort = t_msg->_body._frame._sn; + ztu->_sn_rx_best_effort = t_msg->_body._fragment._sn; dbuf = &ztu->_dbuf_best_effort; if (!consecutive) { _Z_DEBUG("Non-consecutive fragments received"); From 6da6c4503514c6eab7316c839b9ed5d6ffb9affc Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 18:35:26 +0100 Subject: [PATCH 30/32] fix: format --- src/transport/multicast/rx.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index ce595aaf8..5c29abeaf 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -193,8 +193,8 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R)) { // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured - if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._fragment._sn) == - true) { + if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, + t_msg->_body._fragment._sn) == true) { bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._fragment._sn); entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._fragment._sn; From 3fa9b5d6924be9c863736cc2aa78dde04202c2c1 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 20:49:35 +0100 Subject: [PATCH 31/32] fix: reset dbuf instead of clear --- src/transport/multicast/rx.c | 24 ++++++++++-------------- src/transport/unicast/rx.c | 24 ++++++++++-------------- 2 files changed, 20 insertions(+), 28 deletions(-) diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 5c29abeaf..d87d346c7 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -188,6 +188,7 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, } entry->_received = true; + bool consecutive; _z_wbuf_t *dbuf; // Check if the SN is correct and select the right defragmentation buffer if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R)) { @@ -195,15 +196,10 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, // monotonic SNs are ensured if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._fragment._sn) == true) { - bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, + consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._fragment._sn); entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._fragment._sn; dbuf = &entry->_dbuf_reliable; - if (!consecutive) { - _Z_DEBUG("Non-consecutive fragments received"); - _z_wbuf_reset(dbuf); - break; - } } else { _z_wbuf_clear(&entry->_dbuf_reliable); _Z_INFO("Reliable message dropped because it is out of order"); @@ -212,31 +208,31 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, } else { if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._fragment._sn)) { - bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, + consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._fragment._sn); entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._fragment._sn; dbuf = &entry->_dbuf_best_effort; - if (!consecutive) { - _Z_DEBUG("Non-consecutive fragments received"); - _z_wbuf_reset(dbuf); - break; - } } else { _z_wbuf_clear(&entry->_dbuf_best_effort); _Z_INFO("Best effort message dropped because it is out of order"); break; } } + if (!consecutive && _z_wbuf_len(dbuf) > 0) { + _Z_DEBUG("Non-consecutive fragments received"); + _z_wbuf_reset(dbuf); + break; + } // Handle fragment markers if (_Z_PATCH_HAS_FRAGMENT_MARKERS(entry->_patch)) { if (t_msg->_body._fragment.first) { - _z_wbuf_clear(dbuf); + _z_wbuf_reset(dbuf); } else if (_z_wbuf_len(dbuf) == 0) { _Z_DEBUG("First fragment received without the first marker"); break; } if (t_msg->_body._fragment.drop) { - _z_wbuf_clear(dbuf); + _z_wbuf_reset(dbuf); break; } } diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index c0b1fca42..5a9a3bd70 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -141,21 +141,17 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t case _Z_MID_T_FRAGMENT: { _Z_INFO("Received Z_FRAGMENT message"); #if Z_FEATURE_FRAGMENTATION == 1 + bool consecutive; _z_wbuf_t *dbuf; // Check if the SN is correct and select the right defragmentation buffer if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R)) { // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn)) { - bool consecutive = + consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); ztu->_sn_rx_reliable = t_msg->_body._fragment._sn; dbuf = &ztu->_dbuf_reliable; - if (!consecutive) { - _Z_DEBUG("Non-consecutive fragments received"); - _z_wbuf_reset(dbuf); - break; - } } else { _z_wbuf_clear(&ztu->_dbuf_reliable); _Z_INFO("Reliable message dropped because it is out of order"); @@ -163,31 +159,31 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t } } else { if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn)) { - bool consecutive = + consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); ztu->_sn_rx_best_effort = t_msg->_body._fragment._sn; dbuf = &ztu->_dbuf_best_effort; - if (!consecutive) { - _Z_DEBUG("Non-consecutive fragments received"); - _z_wbuf_reset(dbuf); - break; - } } else { _z_wbuf_clear(&ztu->_dbuf_best_effort); _Z_INFO("Best effort message dropped because it is out of order"); break; } } + if (!consecutive && _z_wbuf_len(dbuf) > 0) { + _Z_DEBUG("Non-consecutive fragments received"); + _z_wbuf_reset(dbuf); + break; + } // Handle fragment markers if (_Z_PATCH_HAS_FRAGMENT_MARKERS(ztu->_patch)) { if (t_msg->_body._fragment.first) { - _z_wbuf_clear(dbuf); + _z_wbuf_reset(dbuf); } else if (_z_wbuf_len(dbuf) == 0) { _Z_DEBUG("First fragment received without the start marker"); break; } if (t_msg->_body._fragment.drop) { - _z_wbuf_clear(dbuf); + _z_wbuf_reset(dbuf); break; } } From 9320775bff13102e1f508cb8361e9547c1f90b92 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 4 Dec 2024 20:57:09 +0100 Subject: [PATCH 32/32] fix: format --- src/transport/multicast/rx.c | 4 ++-- src/transport/unicast/rx.c | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index d87d346c7..40e9f0b02 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -197,7 +197,7 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._fragment._sn) == true) { consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, - t_msg->_body._fragment._sn); + t_msg->_body._fragment._sn); entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._fragment._sn; dbuf = &entry->_dbuf_reliable; } else { @@ -209,7 +209,7 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._fragment._sn)) { consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, - t_msg->_body._fragment._sn); + t_msg->_body._fragment._sn); entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._fragment._sn; dbuf = &entry->_dbuf_best_effort; } else { diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index 5a9a3bd70..e0b8e0c68 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -148,8 +148,7 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t // @TODO: amend once reliability is in place. For the time being only // monotonic SNs are ensured if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn)) { - consecutive = - _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); + consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); ztu->_sn_rx_reliable = t_msg->_body._fragment._sn; dbuf = &ztu->_dbuf_reliable; } else { @@ -159,8 +158,7 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t } } else { if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn)) { - consecutive = - _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); + consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); ztu->_sn_rx_best_effort = t_msg->_body._fragment._sn; dbuf = &ztu->_dbuf_best_effort; } else {