Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for qos settings in sample #348

Merged
merged 10 commits into from
Feb 23, 2024
10 changes: 10 additions & 0 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,16 @@ typedef struct {
uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior
} zp_send_join_options_t;

/**
* QoS settings of zenoh message.
*
* Members:
* z_priority_t priority: Priority of the message.
* z_congestion_control_t congestion_control: Congestion control of the message.
* _Bool express: If true, the message is not batched during transmission, in order to reduce latency.
*/
typedef _z_qos_t z_qos_t;

/**
* Represents a data sample.
*
Expand Down
15 changes: 15 additions & 0 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,20 @@ _z_keyexpr_t _z_rname(const char *rname);
*/
_z_keyexpr_t _z_rid_with_suffix(uint16_t rid, const char *suffix);

/**
* QoS settings of zenoh message.
*
* Members:
* z_priority_t priority: Priority of the message.
* z_congestion_control_t congestion_control: Congestion control of the message.
* _Bool express: If true, the message is not batched during transmission, in order to reduce latency.
*/
typedef struct {
z_priority_t priority;
z_congestion_control_t congestion_control;
_Bool express;
} _z_qos_t;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment and the structure do not agree on type visibility (_z_priority_t vs z_priority_t), also we use _Bool for booleans generally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

/**
* A zenoh-net data sample.
*
Expand All @@ -227,6 +241,7 @@ typedef struct {
_z_timestamp_t timestamp;
_z_encoding_t encoding;
z_sample_kind_t kind;
_z_qos_t qos;
Copy link
Contributor

@jean-roland jean-roland Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure by putting the private type here, the public alias z_qos_t is never used. So I'm tempted to use z_qos_t instead, if that doesn't cause include issues.

#if Z_FEATURE_ATTACHMENT == 1
z_attachment_t attachment;
#endif
Expand Down
8 changes: 8 additions & 0 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ typedef struct {

#define _z_n_qos_make(express, nodrop, priority) \
(_z_n_qos_t) { ._val = (((express) << 4) | ((nodrop) << 3) | (priority)) }
inline _z_qos_t _z_n_qos_unmake(_z_n_qos_t n_qos) {
_z_qos_t qos;
qos.priority = (z_priority_t)(n_qos._val & 0b111u);
qos.congestion_control = (n_qos._val & 0b1000u) ? Z_CONGESTION_CONTROL_BLOCK : Z_CONGESTION_CONTROL_DROP;
qos.express = (bool)(n_qos._val & 0b10000u);
return qos;
}
_z_qos_t _z_n_qos_unmake_public(_z_n_qos_t n_qos);
#define _Z_N_QOS_DEFAULT _z_n_qos_make(0, 0, 5)

// RESPONSE FINAL message flags:
Expand Down
5 changes: 3 additions & 2 deletions include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ _z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t

_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub);
void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload,
_z_zint_t payload_len
_z_zint_t payload_len, _z_n_qos_t qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_t att
#endif
);
int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
const _z_n_qos_t qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_t att
Expand Down
7 changes: 4 additions & 3 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -648,9 +648,10 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint
);

// Trigger local subscriptions
_z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len
_z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len,
_z_n_qos_make(0, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority)
#if Z_FEATURE_ATTACHMENT == 1
,
,
opt.attachment
#endif
);
Expand Down Expand Up @@ -747,7 +748,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l
);

// Trigger local subscriptions
_z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len
_z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len, _Z_N_QOS_DEFAULT
#if Z_FEATURE_ATTACHMENT == 1
,
opt.attachment
Expand Down
7 changes: 7 additions & 0 deletions src/protocol/definitions/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,10 @@ void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping) {
break;
}
}
_z_qos_t _z_n_qos_unmake_public(_z_n_qos_t n_qos) {
_z_qos_t qos = _z_n_qos_unmake(n_qos);
if (qos.priority == Z_PRIORITY_REAL_TIME) {
qos.priority = Z_PRIORITY_DEFAULT;
}
return qos;
}
2 changes: 1 addition & 1 deletion src/session/push.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) {
#if Z_FEATURE_ATTACHMENT == 1
z_attachment_t att = _z_encoded_as_attachment(&push->_body._body._put._attachment);
#endif
ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp
ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp, push->_qos
#if Z_FEATURE_ATTACHMENT == 1
,
att
Expand Down
8 changes: 4 additions & 4 deletions src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
z_attachment_t att = _z_encoded_as_attachment(&put._attachment);
#endif
ret = _z_trigger_subscriptions(zn, req._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT,
put._commons._timestamp
put._commons._timestamp, req._ext_qos
#if Z_FEATURE_ATTACHMENT == 1
,
att
Expand All @@ -122,7 +122,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
_z_msg_del_t del = req._body._del;
#if Z_FEATURE_SUBSCRIPTION == 1
ret = _z_trigger_subscriptions(zn, req._key, _z_bytes_empty(), z_encoding_default(),
Z_SAMPLE_KIND_DELETE, del._commons._timestamp
Z_SAMPLE_KIND_DELETE, del._commons._timestamp, req._ext_qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_null()
Expand Down Expand Up @@ -166,7 +166,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
z_attachment_t att = _z_encoded_as_attachment(&put._attachment);
#endif
ret = _z_trigger_subscriptions(zn, response._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT,
put._commons._timestamp
put._commons._timestamp, response._ext_qos
#if Z_FEATURE_ATTACHMENT == 1
,
att
Expand All @@ -178,7 +178,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
_z_msg_del_t del = response._body._del;
#if Z_FEATURE_SUBSCRIPTION == 1
ret = _z_trigger_subscriptions(zn, response._key, _z_bytes_empty(), z_encoding_default(),
Z_SAMPLE_KIND_DELETE, del._commons._timestamp
Z_SAMPLE_KIND_DELETE, del._commons._timestamp, response._ext_qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_null()
Expand Down
13 changes: 8 additions & 5 deletions src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,25 +153,26 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca
}

void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload,
_z_zint_t payload_len
_z_zint_t payload_len, _z_n_qos_t qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_t att
#endif
) {
_z_encoding_t encoding = {.prefix = Z_ENCODING_PREFIX_DEFAULT, .suffix = _z_bytes_wrap(NULL, 0)};
int8_t ret = _z_trigger_subscriptions(zn, keyexpr, _z_bytes_wrap(payload, payload_len), encoding, Z_SAMPLE_KIND_PUT,
_z_timestamp_null()
_z_timestamp_null(), qos
#if Z_FEATURE_ATTACHMENT == 1
,
,
att
#endif
);
(void)ret;
}

int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
const _z_n_qos_t qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_t att
Expand Down Expand Up @@ -200,6 +201,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co
s.encoding = encoding;
s.kind = kind;
s.timestamp = timestamp;
s.qos = _z_n_qos_unmake(qos);
#if Z_FEATURE_ATTACHMENT == 1
s.attachment = att;
#endif
Expand Down Expand Up @@ -256,11 +258,12 @@ void _z_flush_subscriptions(_z_session_t *zn) {
#else // Z_FEATURE_SUBSCRIPTION == 0

void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload,
_z_zint_t payload_len) {
_z_zint_t payload_len, _z_n_qos_t qos) {
_ZP_UNUSED(zn);
_ZP_UNUSED(keyexpr);
_ZP_UNUSED(payload);
_ZP_UNUSED(payload_len);
_ZP_UNUSED(qos);
}

#endif // Z_FEATURE_SUBSCRIPTION == 1
2 changes: 1 addition & 1 deletion tests/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def check_output(tx_status, tx_output, rx_status, rx_output):
# Expected rx output & status
z_rx_expected_status = 0
z_rx_expected_output = (
"[rx]: Received packet on test/zenoh-pico-fragment, len: 10000, validity: 1")
"[rx]: Received packet on test/zenoh-pico-fragment, len: 10000, validity: 1, qos {priority: 4, cong_ctrl: 0}")

# Check the exit status of tx
if tx_status == z_tx_expected_status:
Expand Down
3 changes: 2 additions & 1 deletion tests/z_test_fragment_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
break;
}
}
printf("[rx]: Received packet on %s, len: %d, validity: %d\n", z_loan(keystr), (int)sample->payload.len, is_valid);
printf("[rx]: Received packet on %s, len: %d, validity: %d, qos {priority: %d, cong_ctrl: %d}\n", z_loan(keystr),

Check notice

Code scanning / Cppcheck (reported by Codacy)

MISRA 17.7 rule Note test

MISRA 17.7 rule
(int)sample->payload.len, is_valid, sample->qos.priority, sample->qos.congestion_control);
z_drop(z_move(keystr));
}

Expand Down
2 changes: 2 additions & 0 deletions tests/z_test_fragment_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ int main(int argc, char **argv) {
// Put data
z_put_options_t options = z_put_options_default();
options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL);
options.priority = Z_PRIORITY_DATA_HIGH;
options.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
for (int i = 0; i < 5; i++) {
printf("[tx]: Sending packet on %s, len: %d\n", keyexpr, (int)size);
if (z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, size, &options) < 0) {
Expand Down
Loading