Skip to content

Commit

Permalink
Merge pull request #813 from ZettaScaleLabs/feat/fragment_marker
Browse files Browse the repository at this point in the history
feat: support start/stop fragment marker
  • Loading branch information
Mallets authored Dec 5, 2024
2 parents b63ffa3 + 9320775 commit 54191c8
Show file tree
Hide file tree
Showing 17 changed files with 293 additions and 35 deletions.
25 changes: 21 additions & 4 deletions include/zenoh-pico/protocol/definitions/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ extern "C" {
// Z Extensions if Z==1 then Zenoh extensions are present
#define _Z_FLAG_T_CLOSE_S 0x20 // 1 << 5

/*=============================*/
/* Patch */
/*=============================*/
/// Used to negotiate the patch version of the protocol
/// if not present (or 0), then protocol as released with 1.0.0
/// if >= 1, then fragmentation start/stop marker
#define _Z_NO_PATCH 0x00
#define _Z_CURRENT_PATCH 0x01
#define _Z_PATCH_HAS_FRAGMENT_MARKERS(patch) (patch >= 1)

/*=============================*/
/* Transport Messages */
/*=============================*/
Expand Down Expand Up @@ -235,6 +245,9 @@ typedef struct {
uint8_t _req_id_res;
uint8_t _seq_num_res;
uint8_t _version;
#if Z_FEATURE_FRAGMENTATION == 1
uint8_t _patch;
#endif
} _z_t_msg_join_t;
void _z_t_msg_join_clear(_z_t_msg_join_t *msg);

Expand Down Expand Up @@ -315,6 +328,9 @@ typedef struct {
uint8_t _req_id_res;
uint8_t _seq_num_res;
uint8_t _version;
#if Z_FEATURE_FRAGMENTATION == 1
uint8_t _patch;
#endif
} _z_t_msg_init_t;
void _z_t_msg_init_clear(_z_t_msg_init_t *msg);

Expand Down Expand Up @@ -478,11 +494,11 @@ void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg);
typedef struct {
_z_slice_t _payload;
_z_zint_t _sn;
bool first;
bool drop;
} _z_t_msg_fragment_t;
void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg);

#define _Z_FRAGMENT_HEADER_SIZE 12

/*------------------ Transport Message ------------------*/
typedef union {
_z_t_msg_join_t _join;
Expand Down Expand Up @@ -514,9 +530,10 @@ _z_transport_message_t _z_t_msg_make_keep_alive(void);
_z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_vec_t messages,
z_reliability_t reliability);
_z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t reliability);
_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last);
_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last,
bool first, bool drop);
_z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t messages, z_reliability_t reliability,
bool is_last);
bool is_last, bool first, bool drop);

/*------------------ Copy ------------------*/
void _z_t_msg_copy(_z_transport_message_t *clone, _z_transport_message_t *msg);
Expand Down
7 changes: 6 additions & 1 deletion include/zenoh-pico/protocol/ext.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ extern "C" {
/*=============================*/
/* Extension IDs */
/*=============================*/
// #define _Z_MSG_EXT_ID_FOO 0x00 // Hex(ENC|M|ID)
#define _Z_MSG_EXT_ID_JOIN_QOS (0x01 | _Z_MSG_EXT_FLAG_M | _Z_MSG_EXT_ENC_ZBUF)
#define _Z_MSG_EXT_ID_JOIN_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT)
#define _Z_MSG_EXT_ID_INIT_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT)
#define _Z_MSG_EXT_ID_FRAGMENT_FIRST (0x02 | _Z_MSG_EXT_ENC_UNIT)
#define _Z_MSG_EXT_ID_FRAGMENT_DROP (0x03 | _Z_MSG_EXT_ENC_UNIT)

/*=============================*/
/* Extension Encodings */
Expand All @@ -58,6 +62,7 @@ extern "C" {
#define _Z_MSG_EXT_FLAG_M 0x10
#define _Z_MSG_EXT_IS_MANDATORY(h) ((h & _Z_MSG_EXT_FLAG_M) != 0)
#define _Z_MSG_EXT_FLAG_Z 0x80
#define _Z_MSG_EXT_MORE(more) (more ? _Z_MSG_EXT_FLAG_Z : 0)

typedef struct {
uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/transport/common/tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability);
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability);
/*This function is unsafe because it operates in potentially concurrent
data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */
z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn);
z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn,
bool first);

/*------------------ Transmission and Reception helpers ------------------*/
z_result_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg);
Expand Down
13 changes: 13 additions & 0 deletions include/zenoh-pico/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ typedef struct {

uint16_t _peer_id;
volatile bool _received;

#if Z_FEATURE_FRAGMENTATION == 1
// Patch
uint8_t _patch;
#endif
} _z_transport_peer_entry_t;

size_t _z_transport_peer_entry_size(const _z_transport_peer_entry_t *src);
Expand Down Expand Up @@ -108,6 +113,11 @@ typedef struct {

volatile bool _received;
volatile bool _transmitted;

#if Z_FEATURE_FRAGMENTATION == 1
// Patch
uint8_t _patch;
#endif
} _z_transport_unicast_t;

typedef struct _z_transport_multicast_t {
Expand Down Expand Up @@ -175,6 +185,9 @@ typedef struct {
uint8_t _req_id_res;
uint8_t _seq_num_res;
bool _is_qos;
#if Z_FEATURE_FRAGMENTATION == 1
uint8_t _patch;
#endif
} _z_transport_unicast_establish_param_t;

typedef struct {
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/transport/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ _z_zint_t _z_sn_max(uint8_t bits);
_z_zint_t _z_sn_half(_z_zint_t sn);
_z_zint_t _z_sn_modulo_mask(uint8_t bits);
bool _z_sn_precedes(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right);
bool _z_sn_consecutive(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right);
_z_zint_t _z_sn_increment(const _z_zint_t sn_resolution, const _z_zint_t sn);
_z_zint_t _z_sn_decrement(const _z_zint_t sn_resolution, const _z_zint_t sn);

Expand Down
93 changes: 84 additions & 9 deletions src/protocol/codec/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,14 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t
}
_Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._reliable));
_Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._best_effort));
#if Z_FEATURE_FRAGMENTATION == 1
bool has_patch = msg->_patch != _Z_NO_PATCH;
#else
bool has_patch = false;
#endif
if (msg->_next_sn._is_qos) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1));
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_QOS | _Z_MSG_EXT_MORE(has_patch)));
size_t len = 0;
for (uint8_t i = 0; (i < Z_PRIORITIES_NUM) && (ret == _Z_RES_OK); i++) {
len += _z_zint_len(msg->_next_sn._val._qos[i]._reliable) +
Expand All @@ -82,21 +87,35 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}
#if Z_FEATURE_FRAGMENTATION == 1
if (has_patch) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH));
_Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch));
} else {
_Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset");
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}
#endif

return ret;
}

z_result_t _z_join_decode_ext(_z_msg_ext_t *extension, void *ctx) {
z_result_t ret = _Z_RES_OK;
_z_t_msg_join_t *msg = (_z_t_msg_join_t *)ctx;
if (_Z_EXT_FULL_ID(extension->_header) ==
(_Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)) { // QOS: (enc=zbuf)(mandatory=true)(id=1)
if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_QOS) {
msg->_next_sn._is_qos = true;
_z_zbuf_t zbf = _z_slice_as_zbuf(extension->_body._zbuf._val);
for (int i = 0; (ret == _Z_RES_OK) && (i < Z_PRIORITIES_NUM); ++i) {
ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._reliable, &zbf);
ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._best_effort, &zbf);
}
#if Z_FEATURE_FRAGMENTATION == 1
} else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_PATCH) {
msg->_patch = (uint8_t)extension->_body._zint._val;
#endif
} else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) {
ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN;
}
Expand Down Expand Up @@ -147,6 +166,7 @@ z_result_t _z_join_decode(_z_t_msg_join_t *msg, _z_zbuf_t *zbf, uint8_t header)
ret |= _z_zsize_decode(&msg->_next_sn._val._plain._reliable, zbf);
ret |= _z_zsize_decode(&msg->_next_sn._val._plain._best_effort, zbf);
}
msg->_patch = _Z_NO_PATCH;
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
ret |= _z_msg_ext_decode_iter(zbf, _z_join_decode_ext, msg);
}
Expand Down Expand Up @@ -180,6 +200,32 @@ z_result_t _z_init_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_init_t
_Z_RETURN_IF_ERR(_z_slice_encode(wbf, &msg->_cookie))
}

#if Z_FEATURE_FRAGMENTATION == 1
if (msg->_patch != _Z_NO_PATCH) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH));
_Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch));
} else {
_Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset");
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}
#endif

return ret;
}

z_result_t _z_init_decode_ext(_z_msg_ext_t *extension, void *ctx) {
z_result_t ret = _Z_RES_OK;
_z_t_msg_init_t *msg = (_z_t_msg_init_t *)ctx;
if (false) {
#if Z_FEATURE_FRAGMENTATION == 1
} else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_INIT_PATCH) {
msg->_patch = (uint8_t)extension->_body._zint._val;
#endif
} else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) {
ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN;
}
return ret;
}

Expand Down Expand Up @@ -222,8 +268,9 @@ z_result_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header)
msg->_cookie = _z_slice_empty();
}

if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) {
ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x01);
msg->_patch = _Z_NO_PATCH;
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
ret |= _z_msg_ext_decode_iter(zbf, _z_init_decode_ext, msg);
}

return ret;
Expand Down Expand Up @@ -388,25 +435,53 @@ z_result_t _z_fragment_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_fra
z_result_t ret = _Z_RES_OK;
_Z_DEBUG("Encoding _Z_TRANSPORT_FRAGMENT");
_Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_sn))
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
ret = _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
if (msg->first) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_FRAGMENT_FIRST | _Z_MSG_EXT_MORE(msg->drop)));
} else {
_Z_DEBUG("Attempted to serialize Start extension, but the header extension flag was unset");
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}
if (msg->drop) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_FRAGMENT_DROP));
} else {
_Z_DEBUG("Attempted to serialize Stop extension, but the header extension flag was unset");
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}
if (ret == _Z_RES_OK && _z_slice_check(&msg->_payload)) {
if (_z_slice_check(&msg->_payload)) {
_Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, msg->_payload.start, 0, msg->_payload.len));
}

return ret;
}

z_result_t _z_fragment_decode_ext(_z_msg_ext_t *extension, void *ctx) {
z_result_t ret = _Z_RES_OK;
_z_t_msg_fragment_t *msg = (_z_t_msg_fragment_t *)ctx;
if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_FRAGMENT_FIRST) {
msg->first = true;
} else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_FRAGMENT_DROP) {
msg->drop = true;
} else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) {
ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN;
}
return ret;
}

z_result_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t header) {
z_result_t ret = _Z_RES_OK;
*msg = (_z_t_msg_fragment_t){0};

_Z_DEBUG("Decoding _Z_TRANSPORT_FRAGMENT");
ret |= _z_zsize_decode(&msg->_sn, zbf);

msg->first = false;
msg->drop = false;
if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) {
ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x05);
ret |= _z_msg_ext_decode_iter(zbf, _z_fragment_decode_ext, msg);
}

_z_slice_t slice = _z_slice_alias_buf((uint8_t *)_z_zbuf_start(zbf), _z_zbuf_len(zbf));
Expand Down
Loading

0 comments on commit 54191c8

Please sign in to comment.