Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support start/stop fragment marker #813

Merged
merged 32 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1f4cf1d
feat: support start/stop fragment marker
wyfo Nov 25, 2024
6837602
fix: fix test
wyfo Dec 4, 2024
ad94190
fix: typo
wyfo Dec 4, 2024
c5f50ac
fix: format
wyfo Dec 4, 2024
3736054
fix: fragment ext encoding
wyfo Dec 4, 2024
218019a
fix: rename start/stop marker to first/drop
wyfo Dec 4, 2024
0c5f62b
Update src/protocol/codec/transport.c
wyfo Dec 4, 2024
0f2a139
Update src/protocol/definitions/transport.c
wyfo Dec 4, 2024
c93e694
Update src/protocol/definitions/transport.c
wyfo Dec 4, 2024
8c55511
Update src/protocol/codec/transport.c
wyfo Dec 4, 2024
e997fc4
Update src/transport/unicast/rx.c
wyfo Dec 4, 2024
b42dca2
Update src/transport/unicast/rx.c
wyfo Dec 4, 2024
223d085
Update src/transport/multicast/rx.c
wyfo Dec 4, 2024
a05524d
Update src/protocol/definitions/transport.c
wyfo Dec 4, 2024
107ac1a
Update src/protocol/definitions/transport.c
wyfo Dec 4, 2024
bd89a76
Update src/protocol/codec/transport.c
wyfo Dec 4, 2024
645acc4
Update src/protocol/codec/transport.c
wyfo Dec 4, 2024
002a0e1
Update src/protocol/codec/transport.c
wyfo Dec 4, 2024
47d15e1
Update src/transport/multicast/rx.c
wyfo Dec 4, 2024
524b3df
Update src/transport/multicast/rx.c
wyfo Dec 4, 2024
daa56a0
Update src/transport/multicast/rx.c
wyfo Dec 4, 2024
1ba505b
Update src/transport/unicast/rx.c
wyfo Dec 4, 2024
1b681bb
Update src/transport/unicast/rx.c
wyfo Dec 4, 2024
5490645
Update src/transport/unicast/rx.c
wyfo Dec 4, 2024
1b36f8e
fix: apply PR feedbacks
wyfo Dec 4, 2024
bca36bd
fix: apply PR feedbacks
wyfo Dec 4, 2024
733a23d
fix: initializing all the fields is always a good idea
wyfo Dec 4, 2024
e73ce42
fix: add _patch field in copy functions
wyfo Dec 4, 2024
e55c499
fix: typo
wyfo Dec 4, 2024
6da6c45
fix: format
wyfo Dec 4, 2024
3fa9b5d
fix: reset dbuf instead of clear
wyfo Dec 4, 2024
9320775
fix: format
wyfo Dec 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions include/zenoh-pico/protocol/definitions/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ extern "C" {
// Z Extensions if Z==1 then Zenoh extensions are present
#define _Z_FLAG_T_CLOSE_S 0x20 // 1 << 5

/*=============================*/
/* Patch */
/*=============================*/
/// Used to negotiate the patch version of the protocol
/// if not present (or 0), then protocol as released with 1.0.0
/// if >= 1, then fragmentation start/stop marker
#define _Z_NO_PATCH 0x00
#define _Z_CURRENT_PATCH 0x01
#define _Z_PATCH_HAS_FRAGMENT_MARKERS(patch) (patch >= 1)

/*=============================*/
/* Transport Messages */
/*=============================*/
Expand Down Expand Up @@ -235,6 +245,9 @@ typedef struct {
uint8_t _req_id_res;
uint8_t _seq_num_res;
uint8_t _version;
#if Z_FEATURE_FRAGMENTATION == 1
uint8_t _patch;
#endif
} _z_t_msg_join_t;
void _z_t_msg_join_clear(_z_t_msg_join_t *msg);

Expand Down Expand Up @@ -315,6 +328,9 @@ typedef struct {
uint8_t _req_id_res;
uint8_t _seq_num_res;
uint8_t _version;
#if Z_FEATURE_FRAGMENTATION == 1
uint8_t _patch;
#endif
} _z_t_msg_init_t;
void _z_t_msg_init_clear(_z_t_msg_init_t *msg);

Expand Down Expand Up @@ -478,11 +494,11 @@ void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg);
typedef struct {
_z_slice_t _payload;
_z_zint_t _sn;
bool first;
bool drop;
} _z_t_msg_fragment_t;
void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg);

#define _Z_FRAGMENT_HEADER_SIZE 12

/*------------------ Transport Message ------------------*/
typedef union {
_z_t_msg_join_t _join;
Expand Down Expand Up @@ -514,9 +530,10 @@ _z_transport_message_t _z_t_msg_make_keep_alive(void);
_z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_vec_t messages,
z_reliability_t reliability);
_z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t reliability);
_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last);
_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last,
bool first, bool drop);
_z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t messages, z_reliability_t reliability,
bool is_last);
bool is_last, bool first, bool drop);

/*------------------ Copy ------------------*/
void _z_t_msg_copy(_z_transport_message_t *clone, _z_transport_message_t *msg);
Expand Down
7 changes: 6 additions & 1 deletion include/zenoh-pico/protocol/ext.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ extern "C" {
/*=============================*/
/* Extension IDs */
/*=============================*/
// #define _Z_MSG_EXT_ID_FOO 0x00 // Hex(ENC|M|ID)
#define _Z_MSG_EXT_ID_JOIN_QOS (0x01 | _Z_MSG_EXT_FLAG_M | _Z_MSG_EXT_ENC_ZBUF)
#define _Z_MSG_EXT_ID_JOIN_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT)
#define _Z_MSG_EXT_ID_INIT_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT)
#define _Z_MSG_EXT_ID_FRAGMENT_FIRST (0x02 | _Z_MSG_EXT_ENC_UNIT)
#define _Z_MSG_EXT_ID_FRAGMENT_DROP (0x03 | _Z_MSG_EXT_ENC_UNIT)

/*=============================*/
/* Extension Encodings */
Expand All @@ -58,6 +62,7 @@ extern "C" {
#define _Z_MSG_EXT_FLAG_M 0x10
#define _Z_MSG_EXT_IS_MANDATORY(h) ((h & _Z_MSG_EXT_FLAG_M) != 0)
#define _Z_MSG_EXT_FLAG_Z 0x80
#define _Z_MSG_EXT_MORE(more) (more ? _Z_MSG_EXT_FLAG_Z : 0)

typedef struct {
uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/transport/common/tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability);
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability);
/*This function is unsafe because it operates in potentially concurrent
data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */
z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn);
z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn,
bool first);

/*------------------ Transmission and Reception helpers ------------------*/
z_result_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg);
Expand Down
13 changes: 13 additions & 0 deletions include/zenoh-pico/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ typedef struct {

uint16_t _peer_id;
volatile bool _received;

#if Z_FEATURE_FRAGMENTATION == 1
// Patch
uint8_t _patch;
#endif
} _z_transport_peer_entry_t;

size_t _z_transport_peer_entry_size(const _z_transport_peer_entry_t *src);
Expand Down Expand Up @@ -108,6 +113,11 @@ typedef struct {

volatile bool _received;
volatile bool _transmitted;

#if Z_FEATURE_FRAGMENTATION == 1
// Patch
uint8_t _patch;
#endif
} _z_transport_unicast_t;

typedef struct _z_transport_multicast_t {
Expand Down Expand Up @@ -175,6 +185,9 @@ typedef struct {
uint8_t _req_id_res;
uint8_t _seq_num_res;
bool _is_qos;
#if Z_FEATURE_FRAGMENTATION == 1
uint8_t _patch;
#endif
} _z_transport_unicast_establish_param_t;

typedef struct {
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/transport/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ _z_zint_t _z_sn_max(uint8_t bits);
_z_zint_t _z_sn_half(_z_zint_t sn);
_z_zint_t _z_sn_modulo_mask(uint8_t bits);
bool _z_sn_precedes(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right);
bool _z_sn_consecutive(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right);
_z_zint_t _z_sn_increment(const _z_zint_t sn_resolution, const _z_zint_t sn);
_z_zint_t _z_sn_decrement(const _z_zint_t sn_resolution, const _z_zint_t sn);

Expand Down
92 changes: 83 additions & 9 deletions src/protocol/codec/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,14 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t
}
_Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._reliable));
_Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._best_effort));
#if Z_FEATURE_FRAGMENTATION == 1
bool has_patch = msg->_patch != _Z_NO_PATCH;
#else
bool has_patch = false;
#endif
if (msg->_next_sn._is_qos) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1));
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_QOS | _Z_MSG_EXT_MORE(has_patch)));
size_t len = 0;
for (uint8_t i = 0; (i < Z_PRIORITIES_NUM) && (ret == _Z_RES_OK); i++) {
len += _z_zint_len(msg->_next_sn._val._qos[i]._reliable) +
Expand All @@ -82,21 +87,35 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}
#if Z_FEATURE_FRAGMENTATION == 1
if (has_patch) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH));
_Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch));
} else {
_Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset");
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}
#endif

return ret;
}

z_result_t _z_join_decode_ext(_z_msg_ext_t *extension, void *ctx) {
z_result_t ret = _Z_RES_OK;
_z_t_msg_join_t *msg = (_z_t_msg_join_t *)ctx;
if (_Z_EXT_FULL_ID(extension->_header) ==
(_Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)) { // QOS: (enc=zbuf)(mandatory=true)(id=1)
if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_QOS) {
msg->_next_sn._is_qos = true;
_z_zbuf_t zbf = _z_slice_as_zbuf(extension->_body._zbuf._val);
for (int i = 0; (ret == _Z_RES_OK) && (i < Z_PRIORITIES_NUM); ++i) {
ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._reliable, &zbf);
ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._best_effort, &zbf);
}
#if Z_FEATURE_FRAGMENTATION == 1
} else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_PATCH) {
msg->_patch = (uint8_t)extension->_body._zint._val;
#endif
} else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) {
ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN;
}
Expand Down Expand Up @@ -147,6 +166,7 @@ z_result_t _z_join_decode(_z_t_msg_join_t *msg, _z_zbuf_t *zbf, uint8_t header)
ret |= _z_zsize_decode(&msg->_next_sn._val._plain._reliable, zbf);
ret |= _z_zsize_decode(&msg->_next_sn._val._plain._best_effort, zbf);
}
msg->_patch = _Z_NO_PATCH;
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
ret |= _z_msg_ext_decode_iter(zbf, _z_join_decode_ext, msg);
}
Expand Down Expand Up @@ -180,6 +200,32 @@ z_result_t _z_init_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_init_t
_Z_RETURN_IF_ERR(_z_slice_encode(wbf, &msg->_cookie))
}

#if Z_FEATURE_FRAGMENTATION == 1
if (msg->_patch != _Z_NO_PATCH) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH));
_Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch));
} else {
_Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset");
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}
#endif

return ret;
}

z_result_t _z_init_decode_ext(_z_msg_ext_t *extension, void *ctx) {
z_result_t ret = _Z_RES_OK;
_z_t_msg_init_t *msg = (_z_t_msg_init_t *)ctx;
if (false) {
#if Z_FEATURE_FRAGMENTATION == 1
} else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_INIT_PATCH) {
msg->_patch = (uint8_t)extension->_body._zint._val;
#endif
} else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) {
ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN;
}
return ret;
}

Expand Down Expand Up @@ -222,8 +268,8 @@ z_result_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header)
msg->_cookie = _z_slice_empty();
}

if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) {
ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x01);
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
ret |= _z_msg_ext_decode_iter(zbf, _z_init_decode_ext, msg);
}

return ret;
Expand Down Expand Up @@ -388,25 +434,53 @@ z_result_t _z_fragment_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_fra
z_result_t ret = _Z_RES_OK;
_Z_DEBUG("Encoding _Z_TRANSPORT_FRAGMENT");
_Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_sn))
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
ret = _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
if (msg->first) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a nit and I see there are some that precedes that PR, but all conditions should be booleans so the == true / false would be redundant and can be removed.

Copy link
Contributor Author

@wyfo wyfo Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #813 (comment)
I asked before writing the code, but didn't understand Sasha's answer ^_^' (I would never have the idea to write == true if I had not seen it everywhere in the code)

Copy link
Member

@Mallets Mallets Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, MISRA-C rules about using non-boolean values as boolean expression:
e. g.

int i = 1;
if (i) {}

but this kind of expression is absolutely legal:

bool b = true;
if (b) {}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, yes but I also believe it depends on the static analyser... I'm not saying we should use == true, I was just providing a bit more context :)

_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_FRAGMENT_FIRST | _Z_MSG_EXT_MORE(msg->drop)));
} else {
_Z_DEBUG("Attempted to serialize Start extension, but the header extension flag was unset");
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}
if (msg->drop) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_FRAGMENT_DROP));
} else {
_Z_DEBUG("Attempted to serialize Stop extension, but the header extension flag was unset");
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}
if (ret == _Z_RES_OK && _z_slice_check(&msg->_payload)) {
if (_z_slice_check(&msg->_payload)) {
_Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, msg->_payload.start, 0, msg->_payload.len));
}

return ret;
}

z_result_t _z_fragment_decode_ext(_z_msg_ext_t *extension, void *ctx) {
z_result_t ret = _Z_RES_OK;
_z_t_msg_fragment_t *msg = (_z_t_msg_fragment_t *)ctx;
if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_FRAGMENT_FIRST) {
msg->first = true;
} else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_FRAGMENT_DROP) {
msg->drop = true;
} else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) {
ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN;
}
return ret;
}

z_result_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t header) {
z_result_t ret = _Z_RES_OK;
*msg = (_z_t_msg_fragment_t){0};

_Z_DEBUG("Decoding _Z_TRANSPORT_FRAGMENT");
ret |= _z_zsize_decode(&msg->_sn, zbf);

msg->first = false;
msg->drop = false;
if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) {
ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x05);
ret |= _z_msg_ext_decode_iter(zbf, _z_fragment_decode_ext, msg);
}

_z_slice_t slice = _z_slice_alias_buf((uint8_t *)_z_zbuf_start(zbf), _z_zbuf_len(zbf));
Expand Down
Loading
Loading