Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for qos settings in sample #348

Merged
merged 10 commits into from
Feb 23, 2024
26 changes: 26 additions & 0 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,32 @@
uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior
} zp_send_join_options_t;

/**
* QoS settings of zenoh message.
*/
typedef _z_qos_t z_qos_t;
/**
* Returns message priority.
*/
static inline z_priority_t z_qos_get_priority(z_qos_t qos) {
z_priority_t ret = _z_n_qos_get_priority(qos);
return ret == _Z_PRIORITY_CONTROL ? Z_PRIORITY_DEFAULT : ret;

Check notice

Code scanning / Cppcheck (reported by Codacy)

MISRA 12.1 rule Note

MISRA 12.1 rule
}
/**
* Returns message congestion control.
*/
static inline z_congestion_control_t z_qos_get_congestion_control(z_qos_t qos) {
return _z_n_qos_get_congestion_control(qos);
}
/**
* Returns message express flag. If set to true, the message is not batched to reduce the latency.
*/
static inline _Bool z_qos_get_express(z_qos_t qos) { return _z_n_qos_get_express(qos); }
/**
* Returns default qos settings.
*/
static inline z_qos_t z_qos_default(void) { return _Z_N_QOS_DEFAULT; }

/**
* Represents a data sample.
*
Expand Down
8 changes: 8 additions & 0 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ _z_keyexpr_t _z_rname(const char *rname);
*/
_z_keyexpr_t _z_rid_with_suffix(uint16_t rid, const char *suffix);

/**
* QoS settings of zenoh message.
*/
typedef struct {
uint8_t _val;
} _z_qos_t;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment and the structure do not agree on type visibility (_z_priority_t vs z_priority_t), also we use _Bool for booleans generally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

/**
* A zenoh-net data sample.
*
Expand All @@ -227,6 +234,7 @@ typedef struct {
_z_timestamp_t timestamp;
_z_encoding_t encoding;
z_sample_kind_t kind;
_z_qos_t qos;
Copy link
Contributor

@jean-roland jean-roland Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure by putting the private type here, the public alias z_qos_t is never used. So I'm tempted to use z_qos_t instead, if that doesn't cause include issues.

#if Z_FEATURE_ATTACHMENT == 1
z_attachment_t attachment;
#endif
Expand Down
22 changes: 17 additions & 5 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,24 @@
#define _Z_FLAG_N_RESPONSE_N 0x20 // 1 << 5
#define _Z_FLAG_N_RESPONSE_M 0x40 // 1 << 6

typedef struct {
uint8_t _val;
} _z_n_qos_t;
typedef _z_qos_t _z_n_qos_t;

#define _z_n_qos_make(express, nodrop, priority) \
(_z_n_qos_t) { ._val = (((express) << 4) | ((nodrop) << 3) | (priority)) }
static inline _z_qos_t _z_n_qos_create(_Bool express, z_congestion_control_t congestion_control,
z_priority_t priority) {
_Bool nodrop = congestion_control == Z_CONGESTION_CONTROL_DROP ? 0 : 1;
_z_n_qos_t ret = {._val = (uint8_t)((express << 4) | (nodrop << 3) | priority)};
return ret;
}
static inline z_priority_t _z_n_qos_get_priority(_z_n_qos_t n_qos) {
return (z_priority_t)(n_qos._val & 0x07 /* 0b111 */);
}
static inline z_congestion_control_t _z_n_qos_get_congestion_control(_z_n_qos_t n_qos) {
return (n_qos._val & 0x08 /* 0b1000 */) ? Z_CONGESTION_CONTROL_BLOCK : Z_CONGESTION_CONTROL_DROP;
}
static inline _Bool _z_n_qos_get_express(_z_n_qos_t n_qos) { return (_Bool)(n_qos._val & 0x10 /* 0b10000 */); }
#define _z_n_qos_make(express, nodrop, priority) \
_z_n_qos_create((_Bool)express, nodrop ? Z_CONGESTION_CONTROL_BLOCK : Z_CONGESTION_CONTROL_DROP, \
(z_priority_t)priority)
#define _Z_N_QOS_DEFAULT _z_n_qos_make(0, 0, 5)

// RESPONSE FINAL message flags:
Expand Down
5 changes: 3 additions & 2 deletions include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ _z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t

_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub);
void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload,
_z_zint_t payload_len
_z_zint_t payload_len, _z_n_qos_t qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_t att
#endif
);
int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
const _z_n_qos_t qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_t att
Expand Down
7 changes: 4 additions & 3 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -648,9 +648,10 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint
);

// Trigger local subscriptions
_z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len
_z_trigger_local_subscriptions(&zs._val.in->val, keyexpr, payload, payload_len,
_z_n_qos_make(0, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority)
#if Z_FEATURE_ATTACHMENT == 1
,
,
opt.attachment
#endif
);
Expand Down Expand Up @@ -747,7 +748,7 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l
);

// Trigger local subscriptions
_z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len
_z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len, _Z_N_QOS_DEFAULT
#if Z_FEATURE_ATTACHMENT == 1
,
opt.attachment
Expand Down
2 changes: 1 addition & 1 deletion src/session/push.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) {
#if Z_FEATURE_ATTACHMENT == 1
z_attachment_t att = _z_encoded_as_attachment(&push->_body._body._put._attachment);
#endif
ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp
ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp, push->_qos
#if Z_FEATURE_ATTACHMENT == 1
,
att
Expand Down
8 changes: 4 additions & 4 deletions src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
z_attachment_t att = _z_encoded_as_attachment(&put._attachment);
#endif
ret = _z_trigger_subscriptions(zn, req._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT,
put._commons._timestamp
put._commons._timestamp, req._ext_qos
#if Z_FEATURE_ATTACHMENT == 1
,
att
Expand All @@ -122,7 +122,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
_z_msg_del_t del = req._body._del;
#if Z_FEATURE_SUBSCRIPTION == 1
ret = _z_trigger_subscriptions(zn, req._key, _z_bytes_empty(), z_encoding_default(),
Z_SAMPLE_KIND_DELETE, del._commons._timestamp
Z_SAMPLE_KIND_DELETE, del._commons._timestamp, req._ext_qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_null()
Expand Down Expand Up @@ -166,7 +166,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
z_attachment_t att = _z_encoded_as_attachment(&put._attachment);
#endif
ret = _z_trigger_subscriptions(zn, response._key, put._payload, put._encoding, Z_SAMPLE_KIND_PUT,
put._commons._timestamp
put._commons._timestamp, response._ext_qos
#if Z_FEATURE_ATTACHMENT == 1
,
att
Expand All @@ -178,7 +178,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
_z_msg_del_t del = response._body._del;
#if Z_FEATURE_SUBSCRIPTION == 1
ret = _z_trigger_subscriptions(zn, response._key, _z_bytes_empty(), z_encoding_default(),
Z_SAMPLE_KIND_DELETE, del._commons._timestamp
Z_SAMPLE_KIND_DELETE, del._commons._timestamp, response._ext_qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_null()
Expand Down
14 changes: 9 additions & 5 deletions src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "zenoh-pico/config.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/definitions/network.h"
#include "zenoh-pico/protocol/keyexpr.h"
#include "zenoh-pico/session/resource.h"
#include "zenoh-pico/session/session.h"
Expand Down Expand Up @@ -153,25 +154,26 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca
}

void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload,
_z_zint_t payload_len
_z_zint_t payload_len, _z_n_qos_t qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_t att
#endif
) {
_z_encoding_t encoding = {.prefix = Z_ENCODING_PREFIX_DEFAULT, .suffix = _z_bytes_wrap(NULL, 0)};
int8_t ret = _z_trigger_subscriptions(zn, keyexpr, _z_bytes_wrap(payload, payload_len), encoding, Z_SAMPLE_KIND_PUT,
_z_timestamp_null()
_z_timestamp_null(), qos
#if Z_FEATURE_ATTACHMENT == 1
,
,
att
#endif
);
(void)ret;
}

int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp
const _z_encoding_t encoding, const _z_zint_t kind, const _z_timestamp_t timestamp,
const _z_n_qos_t qos
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_t att
Expand Down Expand Up @@ -200,6 +202,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co
s.encoding = encoding;
s.kind = kind;
s.timestamp = timestamp;
s.qos = qos;
#if Z_FEATURE_ATTACHMENT == 1
s.attachment = att;
#endif
Expand Down Expand Up @@ -256,11 +259,12 @@ void _z_flush_subscriptions(_z_session_t *zn) {
#else // Z_FEATURE_SUBSCRIPTION == 0

void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload,
_z_zint_t payload_len) {
_z_zint_t payload_len, _z_n_qos_t qos) {
_ZP_UNUSED(zn);
_ZP_UNUSED(keyexpr);
_ZP_UNUSED(payload);
_ZP_UNUSED(payload_len);
_ZP_UNUSED(qos);
}

#endif // Z_FEATURE_SUBSCRIPTION == 1
2 changes: 1 addition & 1 deletion tests/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def check_output(tx_status, tx_output, rx_status, rx_output):
# Expected rx output & status
z_rx_expected_status = 0
z_rx_expected_output = (
"[rx]: Received packet on test/zenoh-pico-fragment, len: 10000, validity: 1")
"[rx]: Received packet on test/zenoh-pico-fragment, len: 10000, validity: 1, qos {priority: 4, cong_ctrl: 0}")

# Check the exit status of tx
if tx_status == z_tx_expected_status:
Expand Down
4 changes: 3 additions & 1 deletion tests/z_test_fragment_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
break;
}
}
printf("[rx]: Received packet on %s, len: %d, validity: %d\n", z_loan(keystr), (int)sample->payload.len, is_valid);
printf("[rx]: Received packet on %s, len: %d, validity: %d, qos {priority: %d, cong_ctrl: %d}\n", z_loan(keystr),

Check notice

Code scanning / Cppcheck (reported by Codacy)

MISRA 17.7 rule Note test

MISRA 17.7 rule
(int)sample->payload.len, is_valid, z_qos_get_priority(sample->qos),
z_qos_get_congestion_control(sample->qos));
z_drop(z_move(keystr));
}

Expand Down
2 changes: 2 additions & 0 deletions tests/z_test_fragment_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ int main(int argc, char **argv) {
// Put data
z_put_options_t options = z_put_options_default();
options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL);
options.priority = Z_PRIORITY_DATA_HIGH;
options.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
for (int i = 0; i < 5; i++) {
printf("[tx]: Sending packet on %s, len: %d\n", keyexpr, (int)size);
if (z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, size, &options) < 0) {
Expand Down
Loading