From 1f4cf1d55f2aaa6e88654b7a1bf359c9e718c6ae Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 25 Nov 2024 07:42:19 +0100 Subject: [PATCH] feat: support start/stop fragment marker See https://github.com/eclipse-zenoh/zenoh/pull/1597 --- .../protocol/definitions/transport.h | 24 ++++++-- include/zenoh-pico/protocol/ext.h | 7 ++- include/zenoh-pico/transport/common/tx.h | 2 +- include/zenoh-pico/transport/transport.h | 13 ++++ include/zenoh-pico/transport/utils.h | 1 + src/protocol/codec/transport.c | 60 ++++++++++++++++--- src/protocol/definitions/transport.c | 49 +++++++++++++-- src/transport/common/tx.c | 4 +- src/transport/multicast/rx.c | 52 +++++++++++++++- src/transport/multicast/tx.c | 4 +- src/transport/peer_entry.c | 2 + src/transport/raweth/tx.c | 4 +- src/transport/unicast/rx.c | 52 ++++++++++++++-- src/transport/unicast/transport.c | 11 ++++ src/transport/unicast/tx.c | 4 +- src/transport/utils.c | 5 ++ 16 files changed, 262 insertions(+), 32 deletions(-) diff --git a/include/zenoh-pico/protocol/definitions/transport.h b/include/zenoh-pico/protocol/definitions/transport.h index 261c61fc6..d5ef7d295 100644 --- a/include/zenoh-pico/protocol/definitions/transport.h +++ b/include/zenoh-pico/protocol/definitions/transport.h @@ -91,6 +91,16 @@ extern "C" { // Z Extensions if Z==1 then Zenoh extensions are present #define _Z_FLAG_T_CLOSE_S 0x20 // 1 << 5 +/*=============================*/ +/* Patch */ +/*=============================*/ +/// Used to negotiate the patch version of the protocol +/// if not present (or 0), then protocol as released with 1.0.0 +/// if >= 1, then fragmentation start/stop marker +#define _Z_NO_PATCH 0x00 +#define _Z_CURRENT_PATCH 0x01 +#define _Z_PATCH_HAS_FRAGMENT_START_STOP(patch) (patch >= 1) + /*=============================*/ /* Transport Messages */ /*=============================*/ @@ -235,6 +245,9 @@ typedef struct { uint8_t _req_id_res; uint8_t _seq_num_res; uint8_t _version; +#if Z_FEATURE_FRAGMENTATION == 1 + uint8_t _patch; +#endif } _z_t_msg_join_t; void _z_t_msg_join_clear(_z_t_msg_join_t *msg); @@ -315,6 +328,9 @@ typedef struct { uint8_t _req_id_res; uint8_t _seq_num_res; uint8_t _version; +#if Z_FEATURE_FRAGMENTATION == 1 + uint8_t _patch; +#endif } _z_t_msg_init_t; void _z_t_msg_init_clear(_z_t_msg_init_t *msg); @@ -478,11 +494,11 @@ void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg); typedef struct { _z_slice_t _payload; _z_zint_t _sn; + bool start; + bool stop; } _z_t_msg_fragment_t; void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg); -#define _Z_FRAGMENT_HEADER_SIZE 12 - /*------------------ Transport Message ------------------*/ typedef union { _z_t_msg_join_t _join; @@ -514,9 +530,9 @@ _z_transport_message_t _z_t_msg_make_keep_alive(void); _z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_vec_t messages, z_reliability_t reliability); _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t reliability); -_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last); +_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, bool start, bool stop); _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t messages, z_reliability_t reliability, - bool is_last); + bool is_last, bool start, bool stop); /*------------------ Copy ------------------*/ void _z_t_msg_copy(_z_transport_message_t *clone, _z_transport_message_t *msg); diff --git a/include/zenoh-pico/protocol/ext.h b/include/zenoh-pico/protocol/ext.h index eb70ede42..a8f961ee4 100644 --- a/include/zenoh-pico/protocol/ext.h +++ b/include/zenoh-pico/protocol/ext.h @@ -43,7 +43,11 @@ extern "C" { /*=============================*/ /* Extension IDs */ /*=============================*/ -// #define _Z_MSG_EXT_ID_FOO 0x00 // Hex(ENC|M|ID) +#define _Z_MSG_EXT_ID_JOIN_QOS (0x01 | _Z_MSG_EXT_FLAG_M | _Z_MSG_EXT_ENC_ZBUF) +#define _Z_MSG_EXT_ID_JOIN_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT) +#define _Z_MSG_EXT_ID_INIT_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT) +#define _Z_MSG_EXT_ID_FRAGMENT_START (0x02 | _Z_MSG_EXT_ENC_UNIT) +#define _Z_MSG_EXT_ID_FRAGMENT_STOP (0x03 | _Z_MSG_EXT_ENC_UNIT) /*=============================*/ /* Extension Encodings */ @@ -58,6 +62,7 @@ extern "C" { #define _Z_MSG_EXT_FLAG_M 0x10 #define _Z_MSG_EXT_IS_MANDATORY(h) ((h & _Z_MSG_EXT_FLAG_M) != 0) #define _Z_MSG_EXT_FLAG_Z 0x80 +#define _Z_MSG_EXT_MORE(more) (more ? _Z_MSG_EXT_FLAG_Z : 0) typedef struct { uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior diff --git a/include/zenoh-pico/transport/common/tx.h b/include/zenoh-pico/transport/common/tx.h index 05d22a89e..a77fd1a28 100644 --- a/include/zenoh-pico/transport/common/tx.h +++ b/include/zenoh-pico/transport/common/tx.h @@ -27,7 +27,7 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability); /*This function is unsafe because it operates in potentially concurrent data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */ -z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn); +z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, bool start); /*------------------ Transmission and Reception helpers ------------------*/ z_result_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg); diff --git a/include/zenoh-pico/transport/transport.h b/include/zenoh-pico/transport/transport.h index 1671786df..02c304ea6 100644 --- a/include/zenoh-pico/transport/transport.h +++ b/include/zenoh-pico/transport/transport.h @@ -47,6 +47,11 @@ typedef struct { uint16_t _peer_id; volatile bool _received; + +#if Z_FEATURE_FRAGMENTATION == 1 + // Patch + uint8_t _patch; +#endif } _z_transport_peer_entry_t; size_t _z_transport_peer_entry_size(const _z_transport_peer_entry_t *src); @@ -108,6 +113,11 @@ typedef struct { volatile bool _received; volatile bool _transmitted; + +#if Z_FEATURE_FRAGMENTATION == 1 + // Patch + uint8_t _patch; +#endif } _z_transport_unicast_t; typedef struct _z_transport_multicast_t { @@ -175,6 +185,9 @@ typedef struct { uint8_t _req_id_res; uint8_t _seq_num_res; bool _is_qos; +#if Z_FEATURE_FRAGMENTATION == 1 + uint8_t _patch; +#endif } _z_transport_unicast_establish_param_t; typedef struct { diff --git a/include/zenoh-pico/transport/utils.h b/include/zenoh-pico/transport/utils.h index 62fa319b4..25862370b 100644 --- a/include/zenoh-pico/transport/utils.h +++ b/include/zenoh-pico/transport/utils.h @@ -29,6 +29,7 @@ _z_zint_t _z_sn_max(uint8_t bits); _z_zint_t _z_sn_half(_z_zint_t sn); _z_zint_t _z_sn_modulo_mask(uint8_t bits); bool _z_sn_precedes(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right); +bool _z_sn_consecutive(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right); _z_zint_t _z_sn_increment(const _z_zint_t sn_resolution, const _z_zint_t sn); _z_zint_t _z_sn_decrement(const _z_zint_t sn_resolution, const _z_zint_t sn); diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 7eff8715b..7cd5e0d0a 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -64,9 +64,14 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t } _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._reliable)); _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._best_effort)); +#if Z_FEATURE_FRAGMENTATION == 1 + bool has_patch = msg->_patch != _Z_NO_PATCH; +#else + bool has_patch = false; +#endif if (msg->_next_sn._is_qos) { - if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)); + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_QOS | _Z_MSG_EXT_MORE(has_patch))); size_t len = 0; for (uint8_t i = 0; (i < Z_PRIORITIES_NUM) && (ret == _Z_RES_OK); i++) { len += _z_zint_len(msg->_next_sn._val._qos[i]._reliable) + @@ -82,6 +87,17 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; } } +#if Z_FEATURE_FRAGMENTATION == 1 + if (has_patch) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); + _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); + } else { + _Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + } + } +#endif return ret; } @@ -89,14 +105,17 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t z_result_t _z_join_decode_ext(_z_msg_ext_t *extension, void *ctx) { z_result_t ret = _Z_RES_OK; _z_t_msg_join_t *msg = (_z_t_msg_join_t *)ctx; - if (_Z_EXT_FULL_ID(extension->_header) == - (_Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)) { // QOS: (enc=zbuf)(mandatory=true)(id=1) + if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_QOS) { msg->_next_sn._is_qos = true; _z_zbuf_t zbf = _z_slice_as_zbuf(extension->_body._zbuf._val); for (int i = 0; (ret == _Z_RES_OK) && (i < Z_PRIORITIES_NUM); ++i) { ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._reliable, &zbf); ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._best_effort, &zbf); } +#if Z_FEATURE_FRAGMENTATION == 1 + } else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_PATCH) { + msg->_patch = (uint8_t)extension->_body._zint._val; +#endif } else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) { ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN; } @@ -147,7 +166,8 @@ z_result_t _z_join_decode(_z_t_msg_join_t *msg, _z_zbuf_t *zbf, uint8_t header) ret |= _z_zsize_decode(&msg->_next_sn._val._plain._reliable, zbf); ret |= _z_zsize_decode(&msg->_next_sn._val._plain._best_effort, zbf); } - if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { + msg->_patch = _Z_NO_PATCH; + if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { ret |= _z_msg_ext_decode_iter(zbf, _z_join_decode_ext, msg); } @@ -180,6 +200,32 @@ z_result_t _z_init_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_init_t _Z_RETURN_IF_ERR(_z_slice_encode(wbf, &msg->_cookie)) } + #if Z_FEATURE_FRAGMENTATION == 1 + if (msg->_patch != _Z_CURRENT_PATCH) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH)); + _Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch)); + } else { + _Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + } + } + #endif + + return ret; +} + +z_result_t _z_init_decode_ext(_z_msg_ext_t *extension, void *ctx) { + z_result_t ret = _Z_RES_OK; + _z_t_msg_init_t *msg = (_z_t_msg_init_t *)ctx; + if (false) { +#if Z_FEATURE_FRAGMENTATION == 1 + } else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_INIT_PATCH) { + msg->_patch = (uint8_t)extension->_body._zint._val; +#endif + } else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) { + ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN; + } return ret; } @@ -222,8 +268,8 @@ z_result_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header) msg->_cookie = _z_slice_empty(); } - if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) { - ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x01); + if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) { + ret |= _z_msg_ext_decode_iter(zbf, _z_init_decode_ext, msg); } return ret; diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index 2af77cb97..0ef58c6bf 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -102,6 +102,9 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, msg._body._join._batch_size = Z_BATCH_MULTICAST_SIZE; msg._body._join._next_sn = next_sn; msg._body._join._zid = zid; +#if Z_FEATURE_FRAGMENTATION == 1 + msg._body._join._patch = _Z_CURRENT_PATCH; +#endif if ((lease % 1000) == 0) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_T); @@ -112,7 +115,12 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease, _Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_S); } - if (next_sn._is_qos) { +#if Z_FEATURE_FRAGMENTATION == 1 + bool has_patch = msg._body._join._patch != _Z_NO_PATCH; +#else + bool has_patch = false; +#endif + if (next_sn._is_qos == true || has_patch == true) { _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); } @@ -131,6 +139,9 @@ _z_transport_message_t _z_t_msg_make_init_syn(z_whatami_t whatami, _z_id_t zid) msg._body._init._req_id_res = Z_REQ_RESOLUTION; msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE; _z_slice_reset(&msg._body._init._cookie); +#if Z_FEATURE_FRAGMENTATION == 1 + msg._body._init._patch = _Z_CURRENT_PATCH; +#endif if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) || (msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) || @@ -138,6 +149,15 @@ _z_transport_message_t _z_t_msg_make_init_syn(z_whatami_t whatami, _z_id_t zid) _Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S); } +#if Z_FEATURE_FRAGMENTATION == 1 + bool has_patch = msg._body._join._patch != _Z_NO_PATCH; +#else + bool has_patch = false; +#endif + if (has_patch == true) { + _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); + } + return msg; } @@ -153,6 +173,9 @@ _z_transport_message_t _z_t_msg_make_init_ack(z_whatami_t whatami, _z_id_t zid, msg._body._init._req_id_res = Z_REQ_RESOLUTION; msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE; msg._body._init._cookie = cookie; +#if Z_FEATURE_FRAGMENTATION == 1 + msg._body._init._patch = _Z_CURRENT_PATCH; +#endif if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) || (msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) || @@ -160,6 +183,15 @@ _z_transport_message_t _z_t_msg_make_init_ack(z_whatami_t whatami, _z_id_t zid, _Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S); } +#if Z_FEATURE_FRAGMENTATION == 1 + bool has_patch = msg._body._join._patch != _Z_NO_PATCH; +#else + bool has_patch = false; +#endif + if (has_patch == true) { + _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); + } + return msg; } @@ -247,11 +279,11 @@ _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t } /*------------------ Fragment Message ------------------*/ -_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last) { - return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last); +_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last, bool start, bool stop) { + return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last, start, stop); } _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, z_reliability_t reliability, - bool is_last) { + bool is_last, bool start, bool stop) { _z_transport_message_t msg; msg._header = _Z_MID_T_FRAGMENT; if (is_last == false) { @@ -263,12 +295,20 @@ _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, msg._body._fragment._sn = sn; msg._body._fragment._payload = payload; + if (start == true || stop == true) { + _Z_SET_FLAG(msg._header, _Z_FLAG_T_Z); + } + msg._body._fragment.start = start; + msg._body._fragment.stop = stop; return msg; } void _z_t_msg_copy_fragment(_z_t_msg_fragment_t *clone, _z_t_msg_fragment_t *msg) { + clone->_payload = msg->_payload; _z_slice_copy(&clone->_payload, &msg->_payload); + clone->start = msg->start; + clone->stop = msg->stop; } void _z_t_msg_copy_join(_z_t_msg_join_t *clone, _z_t_msg_join_t *msg) { @@ -279,6 +319,7 @@ void _z_t_msg_copy_join(_z_t_msg_join_t *clone, _z_t_msg_join_t *msg) { clone->_req_id_res = msg->_req_id_res; clone->_batch_size = msg->_batch_size; clone->_next_sn = msg->_next_sn; + clone->_patch = msg->_patch; memcpy(clone->_zid.id, msg->_zid.id, 16); } diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 75a837cbe..4f2d66ce0 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -135,7 +135,7 @@ z_result_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t return ret; } -z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn) { +z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn, bool start) { z_result_t ret = _Z_RES_OK; // Assume first that this is not the final fragment @@ -144,7 +144,7 @@ z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z size_t w_pos = _z_wbuf_get_wpos(dst); // Mark the buffer for the writing operation _z_transport_message_t f_hdr = - _z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final); + _z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final, start, false); ret = _z_transport_message_encode(dst, &f_hdr); // Encode the frame header if (ret == _Z_RES_OK) { size_t space_left = _z_wbuf_space_left(dst); diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index d531c6199..78ec6fe31 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -188,9 +188,54 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, } entry->_received = true; - _z_wbuf_t *dbuf = _Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R) - ? &entry->_dbuf_reliable - : &entry->_dbuf_best_effort; // Select the right defragmentation buffer + _z_wbuf_t *dbuf; + // Check if the SN is correct and select the right defragmentation buffer + if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R) == true) { + // @TODO: amend once reliability is in place. For the time being only + // monotonic SNs are ensured + if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._frame._sn) == true) { + bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._reliable, t_msg->_body._fragment._sn); + entry->_sn_rx_sns._val._plain._reliable = t_msg->_body._frame._sn; + dbuf = &entry->_dbuf_reliable; + if (consecutive == false) { + _Z_DEBUG("Non-consecutive fragments received"); + _z_wbuf_reset(dbuf); + break; + } + } else { + _z_wbuf_clear(&entry->_dbuf_reliable); + _Z_INFO("Reliable message dropped because it is out of order"); + break; + } + } else { + if (_z_sn_precedes(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._frame._sn) == true) { + bool consecutive = _z_sn_consecutive(entry->_sn_res, entry->_sn_rx_sns._val._plain._best_effort, t_msg->_body._fragment._sn); + entry->_sn_rx_sns._val._plain._best_effort = t_msg->_body._frame._sn; + dbuf = &entry->_dbuf_best_effort; + if (consecutive == false) { + _Z_DEBUG("Non-consecutive fragments received"); + _z_wbuf_reset(dbuf); + break; + } + } else { + _z_wbuf_clear(&entry->_dbuf_best_effort); + _Z_INFO("Best effort message dropped because it is out of order"); + break; + } + } + // Handle fragment markers + if (_Z_PATCH_HAS_FRAGMENT_START_STOP(entry->_patch)) { + if (t_msg->_body._fragment.start == true) { + _z_wbuf_clear(dbuf); + } else if (_z_wbuf_len(dbuf) == 0) { + _Z_DEBUG("First fragment received without the start marker"); + break; + } + if (t_msg->_body._fragment.stop == true) { + _z_wbuf_clear(dbuf); + break; + } + } bool drop = false; if ((_z_wbuf_len(dbuf) + t_msg->_body._fragment._payload.len) > Z_FRAG_MAX_SIZE) { @@ -280,6 +325,7 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_conduit_sn_list_decrement(entry->_sn_res, &entry->_sn_rx_sns); #if Z_FEATURE_FRAGMENTATION == 1 + entry->_patch = t_msg->_body._join._patch < _Z_CURRENT_PATCH ? t_msg->_body._join._patch : _Z_CURRENT_PATCH; #if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 1 entry->_dbuf_reliable = _z_wbuf_make(0, true); entry->_dbuf_best_effort = _z_wbuf_make(0, true); diff --git a/src/transport/multicast/tx.c b/src/transport/multicast/tx.c index df5b3bcdc..bbb555edd 100644 --- a/src/transport/multicast/tx.c +++ b/src/transport/multicast/tx.c @@ -126,13 +126,12 @@ z_result_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t if (is_first == false) { // Get the fragment sequence number sn = __unsafe_z_multicast_get_sn(ztm, reliability); } - is_first = false; // Clear the buffer for serialization __unsafe_z_prepare_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); // Serialize one fragment - ret = __unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn); + ret = __unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn, is_first); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed __unsafe_z_finalize_wbuf(&ztm->_wbuf, ztm->_link._cap._flow); @@ -144,6 +143,7 @@ z_result_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t } else { _Z_ERROR("Fragment serialization failed with err %d", ret); } + is_first = false; } } // Clear the buffer as it's no longer required diff --git a/src/transport/peer_entry.c b/src/transport/peer_entry.c index 7d72604e3..b363b2eab 100644 --- a/src/transport/peer_entry.c +++ b/src/transport/peer_entry.c @@ -30,6 +30,8 @@ void _z_transport_peer_entry_copy(_z_transport_peer_entry_t *dst, const _z_trans #if Z_FEATURE_FRAGMENTATION == 1 _z_wbuf_copy(&dst->_dbuf_reliable, &src->_dbuf_reliable); _z_wbuf_copy(&dst->_dbuf_best_effort, &src->_dbuf_best_effort); + + dst->_patch = src->_patch; #endif dst->_sn_res = src->_sn_res; diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index b4ddc5def..538c52e1a 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -292,13 +292,12 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ // Get the fragment sequence number sn = __unsafe_z_raweth_get_sn(ztm, reliability); } - is_first = false; // Reset wbuf _z_wbuf_reset(&ztm->_wbuf); // Prepare buff __unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf); // Serialize one fragment - _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn), + _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn, is_first), _zp_raweth_unlock_tx_mutex(ztm)); // Write the eth header _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf), @@ -307,6 +306,7 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ _Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm)); // Mark the session that we have transmitted data ztm->_transmitted = true; + is_first = false; } // Clear the expandable buffer _z_wbuf_clear(&fbf); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index 23c6859b9..8e6540cdc 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -141,10 +141,54 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t case _Z_MID_T_FRAGMENT: { _Z_INFO("Received Z_FRAGMENT message"); #if Z_FEATURE_FRAGMENTATION == 1 - _z_wbuf_t *dbuf = _Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R) - ? &ztu->_dbuf_reliable - : &ztu->_dbuf_best_effort; // Select the right defragmentation buffer - + _z_wbuf_t *dbuf; + // Check if the SN is correct and select the right defragmentation buffer + if (_Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAME_R) == true) { + // @TODO: amend once reliability is in place. For the time being only + // monotonic SNs are ensured + if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._frame._sn) == true) { + bool consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_reliable, t_msg->_body._fragment._sn); + ztu->_sn_rx_reliable = t_msg->_body._frame._sn; + dbuf = &ztu->_dbuf_reliable; + if (consecutive == false) { + _Z_DEBUG("Non-consecutive fragments received"); + _z_wbuf_reset(dbuf); + break; + } + } else { + _z_wbuf_clear(&ztu->_dbuf_reliable); + _Z_INFO("Reliable message dropped because it is out of order"); + break; + } + } else { + if (_z_sn_precedes(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._frame._sn) == true) { + bool consecutive = _z_sn_consecutive(ztu->_sn_res, ztu->_sn_rx_best_effort, t_msg->_body._fragment._sn); + ztu->_sn_rx_best_effort = t_msg->_body._frame._sn; + dbuf = &ztu->_dbuf_best_effort; + if (consecutive == false) { + _Z_DEBUG("Non-consecutive fragments received"); + _z_wbuf_reset(dbuf); + break; + } + } else { + _z_wbuf_clear(&ztu->_dbuf_best_effort); + _Z_INFO("Best effort message dropped because it is out of order"); + break; + } + } + // Handle fragment markers + if (_Z_PATCH_HAS_FRAGMENT_START_STOP(ztu->_patch)) { + if (t_msg->_body._fragment.start == true) { + _z_wbuf_clear(dbuf); + } else if (_z_wbuf_len(dbuf) == 0) { + _Z_DEBUG("First fragment received without the start marker"); + break; + } + if (t_msg->_body._fragment.stop == true) { + _z_wbuf_clear(dbuf); + break; + } + } bool drop = false; if ((_z_wbuf_len(dbuf) + t_msg->_body._fragment._payload.len) > Z_FRAG_MAX_SIZE) { // Filling the wbuf capacity as a way to signal the last fragment to reset the dbuf diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index 01c19ce91..c7ef12aca 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -36,6 +36,10 @@ z_result_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, z_result_t ret = _Z_RES_OK; zt->_type = _Z_TRANSPORT_UNICAST_TYPE; +#if Z_FEATURE_FRAGMENTATION == 1 + // Patch + zt->_transport._unicast._patch = param->_patch; +#endif #if Z_FEATURE_MULTI_THREAD == 1 // Initialize the mutexes @@ -199,6 +203,13 @@ z_result_t _z_unicast_open_client(_z_transport_unicast_establish_param_t *param, } else { ret = _Z_ERR_TRANSPORT_OPEN_SN_RESOLUTION; } +#if Z_FEATURE_FRAGMENTATION == 1 + if (iam._body._init._patch > ism._body._init._patch) { + // TODO: Use a better error code? + ret = _Z_ERR_GENERIC; + } + param->_patch = iam._body._init._patch; +#endif if (ret == _Z_RES_OK) { param->_key_id_res = 0x08 << param->_key_id_res; diff --git a/src/transport/unicast/tx.c b/src/transport/unicast/tx.c index 5ed555794..561c232c7 100644 --- a/src/transport/unicast/tx.c +++ b/src/transport/unicast/tx.c @@ -135,13 +135,12 @@ z_result_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n if (is_first == false) { // Get the fragment sequence number sn = __unsafe_z_unicast_get_sn(ztu, reliability); } - is_first = false; // Clear the buffer for serialization __unsafe_z_prepare_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); // Serialize one fragment - ret = __unsafe_z_serialize_zenoh_fragment(&ztu->_wbuf, &fbf, reliability, sn); + ret = __unsafe_z_serialize_zenoh_fragment(&ztu->_wbuf, &fbf, reliability, sn, is_first); if (ret == _Z_RES_OK) { // Write the message length in the reserved space if needed __unsafe_z_finalize_wbuf(&ztu->_wbuf, ztu->_link._cap._flow); @@ -153,6 +152,7 @@ z_result_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n } else { _Z_ERROR("Fragment serialization failed with err %d", ret); } + is_first = false; } } diff --git a/src/transport/utils.c b/src/transport/utils.c index 510a0f19c..dc0697043 100644 --- a/src/transport/utils.c +++ b/src/transport/utils.c @@ -82,6 +82,11 @@ bool _z_sn_precedes(const _z_zint_t sn_resolution, const _z_zint_t sn_left, cons return ((distance <= _z_sn_half(sn_resolution)) && (distance != 0)); } +bool _z_sn_consecutive(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right) { + _z_zint_t distance = (sn_right - sn_left) & sn_resolution; + return distance == 1; +} + _z_zint_t _z_sn_increment(const _z_zint_t sn_resolution, const _z_zint_t sn) { _z_zint_t ret = sn + 1; return (ret &= sn_resolution);