Skip to content

Commit

Permalink
refactor advanced publisher/subscriber options (#864)
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 authored Dec 17, 2024
1 parent 9175558 commit 2f3fea4
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 59 deletions.
6 changes: 2 additions & 4 deletions examples/z_advanced_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@ int main(int argc, char** argv) {

ze_advanced_publisher_options_t pub_opts;
ze_advanced_publisher_options_default(&pub_opts);
ze_advanced_publisher_cache_options_t cache_options;
ze_advanced_publisher_cache_options_default(&cache_options);
cache_options.max_samples = args.history;
pub_opts.cache = &cache_options;
ze_advanced_publisher_cache_options_default(&pub_opts.cache); // or pub_opts.cache.is_enabled = true;
pub_opts.cache.max_samples = args.history;
pub_opts.publisher_detection = true;
pub_opts.sample_miss_detection = true;

Expand Down
14 changes: 4 additions & 10 deletions examples/z_advanced_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,10 @@ int main(int argc, char** argv) {

ze_advanced_subscriber_options_t sub_opts;
ze_advanced_subscriber_options_default(&sub_opts);

ze_advanced_subscriber_history_options_t sub_history_options;
ze_advanced_subscriber_history_options_default(&sub_history_options);
sub_history_options.detect_late_publishers = true;

ze_advanced_subscriber_recovery_options_t sub_recovery_options;
ze_advanced_subscriber_recovery_options_default(&sub_recovery_options);
sub_recovery_options.periodic_queries_period_ms = 1000;
sub_opts.history = &sub_history_options;
sub_opts.recovery = &sub_recovery_options;
ze_advanced_subscriber_history_options_default(&sub_opts.history); // or sub_opts.history.is_enabled = true;
sub_opts.history.detect_late_publishers = true;
ze_advanced_subscriber_recovery_options_default(&sub_opts.recovery); // or sub_opts.recovery.is_enabled = true;
sub_opts.recovery.periodic_queries_period_ms = 1000;
sub_opts.subscriber_detection = true;

z_owned_closure_sample_t callback;
Expand Down
32 changes: 21 additions & 11 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,11 @@ typedef struct zc_moved_shm_client_list_t {
#if defined(Z_FEATURE_UNSTABLE_API)
typedef struct ze_advanced_publisher_cache_options_t {
/**
* Number of samples to keep for each resource
* Must be set to ``true``, to enable the cache.
*/
bool is_enabled;
/**
* Number of samples to keep for each resource.
*/
size_t max_samples;
/**
Expand Down Expand Up @@ -1053,9 +1057,9 @@ typedef struct ze_advanced_publisher_options_t {
*/
struct z_publisher_options_t publisher_options;
/**
* Optional settings for publisher cache.
* Publisher cache settings.
*/
struct ze_advanced_publisher_cache_options_t *cache;
struct ze_advanced_publisher_cache_options_t cache;
/**
* Allow matching Subscribers to detect lost samples and optionally ask for retransimission.
*
Expand All @@ -1070,7 +1074,7 @@ typedef struct ze_advanced_publisher_options_t {
* An optional key expression to be added to the liveliness token key expression.
* It can be used to convey meta data.
*/
struct z_loaned_keyexpr_t *publisher_detection_metadata;
const struct z_loaned_keyexpr_t *publisher_detection_metadata;
} ze_advanced_publisher_options_t;
#endif
/**
Expand Down Expand Up @@ -1142,6 +1146,10 @@ typedef struct ze_moved_advanced_subscriber_t {
*/
#if defined(Z_FEATURE_UNSTABLE_API)
typedef struct ze_advanced_subscriber_history_options_t {
/**
* Must be set to ``true``, to enable the history data recovery.
*/
bool is_enabled;
/**
* Enable detection of late joiner publishers and query for their historical data.
* Late joiner detection can only be achieved for Publishers that enable publisher_detection.
Expand All @@ -1164,6 +1172,10 @@ typedef struct ze_advanced_subscriber_history_options_t {
*/
#if defined(Z_FEATURE_UNSTABLE_API)
typedef struct ze_advanced_subscriber_recovery_options_t {
/**
* Must be set to ``true``, to enable the lost sample recovery.
*/
bool is_enabled;
/**
* Period for queries for not yet received Samples.
*
Expand All @@ -1186,16 +1198,14 @@ typedef struct ze_advanced_subscriber_options_t {
*/
struct z_subscriber_options_t subscriber_options;
/**
* Optional settings for querying historical data. History can only be retransmitted by Publishers that enable caching.
* Querying historical data is disabled if the value is ``NULL``.
* Settings for querying historical data. History can only be retransmitted by Publishers that enable caching.
*/
struct ze_advanced_subscriber_history_options_t *history;
struct ze_advanced_subscriber_history_options_t history;
/**
* Optional settings for retransmission of detected lost Samples. Retransmission of lost samples can only be done by Publishers that enable
* Settings for retransmission of detected lost Samples. Retransmission of lost samples can only be done by Publishers that enable
* caching and sample_miss_detection.
* Retransmission is disabled if the value is ``NULL``.
*/
struct ze_advanced_subscriber_recovery_options_t *recovery;
struct ze_advanced_subscriber_recovery_options_t recovery;
/**
* Timeout to be used for history and recovery queries.
* Default value will be used if set to ``0``.
Expand All @@ -1209,7 +1219,7 @@ typedef struct ze_advanced_subscriber_options_t {
* An optional key expression to be added to the liveliness token key expression.
* It can be used to convey meta data.
*/
struct z_loaned_keyexpr_t *subscriber_detection_metadata;
const struct z_loaned_keyexpr_t *subscriber_detection_metadata;
} ze_advanced_subscriber_options_t;
#endif
/**
Expand Down
21 changes: 14 additions & 7 deletions src/advanced_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ use crate::{
/// @brief Setting for advanced publisher's cache. The cache allows advanced subscribers to recover history and/or lost samples.
#[repr(C)]
pub struct ze_advanced_publisher_cache_options_t {
/// Number of samples to keep for each resource
/// Must be set to ``true``, to enable the cache.
pub is_enabled: bool,
/// Number of samples to keep for each resource.
pub max_samples: usize,
/// The congestion control to apply to replies.
pub congestion_control: z_congestion_control_t,
Expand All @@ -50,6 +52,7 @@ pub struct ze_advanced_publisher_cache_options_t {
impl Default for ze_advanced_publisher_cache_options_t {
fn default() -> Self {
Self {
is_enabled: true,
max_samples: 1,
congestion_control: CongestionControl::default().into(),
priority: Priority::default().into(),
Expand Down Expand Up @@ -86,8 +89,8 @@ impl From<&ze_advanced_publisher_cache_options_t> for CacheConfig {
pub struct ze_advanced_publisher_options_t {
/// Base publisher options.
pub publisher_options: z_publisher_options_t,
/// Optional settings for publisher cache.
pub cache: Option<&'static mut ze_advanced_publisher_cache_options_t>,
/// Publisher cache settings.
pub cache: ze_advanced_publisher_cache_options_t,
/// Allow matching Subscribers to detect lost samples and optionally ask for retransimission.
///
/// Retransmission can only be done if history is enabled on subscriber side.
Expand All @@ -96,17 +99,21 @@ pub struct ze_advanced_publisher_options_t {
pub publisher_detection: bool,
/// An optional key expression to be added to the liveliness token key expression.
/// It can be used to convey meta data.
pub publisher_detection_metadata: Option<&'static mut z_loaned_keyexpr_t>,
pub publisher_detection_metadata: Option<&'static z_loaned_keyexpr_t>,
}

/// Constructs the default value for `z_publisher_options_t`.
#[no_mangle]
pub extern "C" fn ze_advanced_publisher_options_default(
this_: &mut MaybeUninit<ze_advanced_publisher_options_t>,
) {
let cache = ze_advanced_publisher_cache_options_t {
is_enabled: false,
..Default::default()
};
this_.write(ze_advanced_publisher_options_t {
publisher_options: z_publisher_options_t::default(),
cache: None,
cache,
sample_miss_detection: false,
publisher_detection: false,
publisher_detection_metadata: None,
Expand Down Expand Up @@ -158,8 +165,8 @@ pub extern "C" fn ze_declare_advanced_publisher(
if let Some(pub_detection_metadata) = &options.publisher_detection_metadata {
p = p.publisher_detection_metadata(pub_detection_metadata.as_rust_type_ref());
}
if let Some(cache) = &options.cache {
p = p.cache((&**cache).into());
if options.cache.is_enabled {
p = p.cache((&options.cache).into());
}
}
match p.wait() {
Expand Down
58 changes: 43 additions & 15 deletions src/advanced_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ use crate::{
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief Settings for retrievieng historical data for Advanced Subscriber.
#[repr(C)]
#[derive(Default)]
pub struct ze_advanced_subscriber_history_options_t {
/// Must be set to ``true``, to enable the history data recovery.
pub is_enabled: bool,
/// Enable detection of late joiner publishers and query for their historical data.
/// Late joiner detection can only be achieved for Publishers that enable publisher_detection.
/// History can only be retransmitted by Publishers that enable caching.
Expand All @@ -43,6 +44,17 @@ pub struct ze_advanced_subscriber_history_options_t {
pub max_age_ms: u64,
}

impl Default for ze_advanced_subscriber_history_options_t {
fn default() -> Self {
Self {
is_enabled: true,
detect_late_publishers: false,
max_samples: 0,
max_age_ms: 0,
}
}
}

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief Constructs the default value for `ze_advanced_subscriber_history_options_t`.
#[no_mangle]
Expand Down Expand Up @@ -71,8 +83,9 @@ impl From<&ze_advanced_subscriber_history_options_t> for HistoryConfig {
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief Settings for recovering lost messages for Advanced Subscriber.
#[repr(C)]
#[derive(Default)]
pub struct ze_advanced_subscriber_recovery_options_t {
/// Must be set to ``true``, to enable the lost sample recovery.
pub is_enabled: bool,
/// Period for queries for not yet received Samples.
///
/// These queries allow to retrieve the last Sample(s) if the last Sample(s) is/are lost.
Expand All @@ -82,6 +95,15 @@ pub struct ze_advanced_subscriber_recovery_options_t {
pub periodic_queries_period_ms: u64,
}

impl Default for ze_advanced_subscriber_recovery_options_t {
fn default() -> Self {
Self {
is_enabled: true,
periodic_queries_period_ms: 0,
}
}
}

impl From<&ze_advanced_subscriber_recovery_options_t> for RecoveryConfig {
fn from(val: &ze_advanced_subscriber_recovery_options_t) -> RecoveryConfig {
let mut r = RecoveryConfig::default();
Expand All @@ -107,21 +129,19 @@ pub extern "C" fn ze_advanced_subscriber_recovery_options_default(
pub struct ze_advanced_subscriber_options_t {
/// Base subscriber options.
pub subscriber_options: z_subscriber_options_t,
/// Optional settings for querying historical data. History can only be retransmitted by Publishers that enable caching.
/// Querying historical data is disabled if the value is ``NULL``.
pub history: Option<&'static mut ze_advanced_subscriber_history_options_t>,
/// Optional settings for retransmission of detected lost Samples. Retransmission of lost samples can only be done by Publishers that enable
/// Settings for querying historical data. History can only be retransmitted by Publishers that enable caching.
pub history: ze_advanced_subscriber_history_options_t,
/// Settings for retransmission of detected lost Samples. Retransmission of lost samples can only be done by Publishers that enable
/// caching and sample_miss_detection.
/// Retransmission is disabled if the value is ``NULL``.
pub recovery: Option<&'static mut ze_advanced_subscriber_recovery_options_t>,
pub recovery: ze_advanced_subscriber_recovery_options_t,
/// Timeout to be used for history and recovery queries.
/// Default value will be used if set to ``0``.
pub query_timeout_ms: u64,
/// Allow this subscriber to be detected through liveliness.
pub subscriber_detection: bool,
/// An optional key expression to be added to the liveliness token key expression.
/// It can be used to convey meta data.
pub subscriber_detection_metadata: Option<&'static mut z_loaned_keyexpr_t>,
pub subscriber_detection_metadata: Option<&'static z_loaned_keyexpr_t>,
}

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
Expand All @@ -130,10 +150,18 @@ pub struct ze_advanced_subscriber_options_t {
pub extern "C" fn ze_advanced_subscriber_options_default(
this: &mut MaybeUninit<ze_advanced_subscriber_options_t>,
) {
let history = ze_advanced_subscriber_history_options_t {
is_enabled: false,
..Default::default()
};
let recovery = ze_advanced_subscriber_recovery_options_t {
is_enabled: false,
..Default::default()
};
this.write(ze_advanced_subscriber_options_t {
subscriber_options: z_subscriber_options_t::default(),
history: None,
recovery: None,
history,
recovery,
query_timeout_ms: 0,
subscriber_detection: false,
subscriber_detection_metadata: None,
Expand Down Expand Up @@ -163,11 +191,11 @@ fn _declare_advanced_subscriber_inner(
if let Some(sub_detection_metadata) = &options.subscriber_detection_metadata {
sub = sub.subscriber_detection_metadata(sub_detection_metadata.as_rust_type_ref());
}
if let Some(history) = &options.history {
sub = sub.history((&**history).into());
if options.history.is_enabled {
sub = sub.history((&options.history).into());
}
if let Some(recovery) = &options.recovery {
sub = sub.recovery((&**recovery).into());
if options.recovery.is_enabled {
sub = sub.recovery((&options.recovery).into());
}
}
sub
Expand Down
18 changes: 6 additions & 12 deletions tests/z_int_advanced_pub_sub_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,8 @@ int run_publisher() {

ze_advanced_publisher_options_t pub_opts;
ze_advanced_publisher_options_default(&pub_opts);
ze_advanced_publisher_cache_options_t cache_options;
ze_advanced_publisher_cache_options_default(&cache_options);
cache_options.max_samples = values_count;
pub_opts.cache = &cache_options;
ze_advanced_publisher_cache_options_default(&pub_opts.cache);
pub_opts.cache.max_samples = values_count;
pub_opts.publisher_detection = true;
pub_opts.sample_miss_detection = true;

Expand Down Expand Up @@ -141,15 +139,11 @@ int run_subscriber() {
ze_advanced_subscriber_options_t sub_opts;
ze_advanced_subscriber_options_default(&sub_opts);

ze_advanced_subscriber_history_options_t sub_history_options;
ze_advanced_subscriber_history_options_default(&sub_history_options);
sub_history_options.detect_late_publishers = true;
ze_advanced_subscriber_history_options_default(&sub_opts.history);
sub_opts.history.detect_late_publishers = true;

ze_advanced_subscriber_recovery_options_t sub_recovery_options;
ze_advanced_subscriber_recovery_options_default(&sub_recovery_options);
sub_recovery_options.periodic_queries_period_ms = 1000;
sub_opts.history = &sub_history_options;
sub_opts.recovery = &sub_recovery_options;
ze_advanced_subscriber_recovery_options_default(&sub_opts.recovery);
sub_opts.recovery.periodic_queries_period_ms = 1000;
sub_opts.subscriber_detection = true;

z_owned_closure_sample_t callback;
Expand Down

0 comments on commit 2f3fea4

Please sign in to comment.