Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

advanced publisher/subscriber options contains cache options inside #864

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions examples/z_advanced_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@ int main(int argc, char** argv) {

ze_advanced_publisher_options_t pub_opts;
ze_advanced_publisher_options_default(&pub_opts);
ze_advanced_publisher_cache_options_t cache_options;
ze_advanced_publisher_cache_options_default(&cache_options);
cache_options.max_samples = args.history;
pub_opts.cache = &cache_options;
ze_advanced_publisher_cache_options_default(&pub_opts.cache); // or pub_opts.cache.is_enabled = true;
pub_opts.cache.max_samples = args.history;
pub_opts.publisher_detection = true;
pub_opts.sample_miss_detection = true;

Expand Down
14 changes: 4 additions & 10 deletions examples/z_advanced_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,10 @@ int main(int argc, char** argv) {

ze_advanced_subscriber_options_t sub_opts;
ze_advanced_subscriber_options_default(&sub_opts);

ze_advanced_subscriber_history_options_t sub_history_options;
ze_advanced_subscriber_history_options_default(&sub_history_options);
sub_history_options.detect_late_publishers = true;

ze_advanced_subscriber_recovery_options_t sub_recovery_options;
ze_advanced_subscriber_recovery_options_default(&sub_recovery_options);
sub_recovery_options.periodic_queries_period_ms = 1000;
sub_opts.history = &sub_history_options;
sub_opts.recovery = &sub_recovery_options;
ze_advanced_subscriber_history_options_default(&sub_opts.history); // or sub_opts.history.is_enabled = true;
sub_opts.history.detect_late_publishers = true;
ze_advanced_subscriber_recovery_options_default(&sub_opts.recovery); // or sub_opts.recovery.is_enabled = true;
sub_opts.recovery.periodic_queries_period_ms = 1000;
sub_opts.subscriber_detection = true;

z_owned_closure_sample_t callback;
Expand Down
32 changes: 21 additions & 11 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,11 @@ typedef struct zc_moved_shm_client_list_t {
#if defined(Z_FEATURE_UNSTABLE_API)
typedef struct ze_advanced_publisher_cache_options_t {
/**
* Number of samples to keep for each resource
* Must be set to ``true``, to enable the cache.
*/
bool is_enabled;
/**
* Number of samples to keep for each resource.
*/
size_t max_samples;
/**
Expand Down Expand Up @@ -1053,9 +1057,9 @@ typedef struct ze_advanced_publisher_options_t {
*/
struct z_publisher_options_t publisher_options;
/**
* Optional settings for publisher cache.
* Publisher cache settings.
*/
struct ze_advanced_publisher_cache_options_t *cache;
struct ze_advanced_publisher_cache_options_t cache;
/**
* Allow matching Subscribers to detect lost samples and optionally ask for retransimission.
*
Expand All @@ -1070,7 +1074,7 @@ typedef struct ze_advanced_publisher_options_t {
* An optional key expression to be added to the liveliness token key expression.
* It can be used to convey meta data.
*/
struct z_loaned_keyexpr_t *publisher_detection_metadata;
const struct z_loaned_keyexpr_t *publisher_detection_metadata;
} ze_advanced_publisher_options_t;
#endif
/**
Expand Down Expand Up @@ -1142,6 +1146,10 @@ typedef struct ze_moved_advanced_subscriber_t {
*/
#if defined(Z_FEATURE_UNSTABLE_API)
typedef struct ze_advanced_subscriber_history_options_t {
/**
* Must be set to ``true``, to enable the history data recovery.
*/
bool is_enabled;
/**
* Enable detection of late joiner publishers and query for their historical data.
* Late joiner detection can only be achieved for Publishers that enable publisher_detection.
Expand All @@ -1164,6 +1172,10 @@ typedef struct ze_advanced_subscriber_history_options_t {
*/
#if defined(Z_FEATURE_UNSTABLE_API)
typedef struct ze_advanced_subscriber_recovery_options_t {
/**
* Must be set to ``true``, to enable the lost sample recovery.
*/
bool is_enabled;
/**
* Period for queries for not yet received Samples.
*
Expand All @@ -1186,16 +1198,14 @@ typedef struct ze_advanced_subscriber_options_t {
*/
struct z_subscriber_options_t subscriber_options;
/**
* Optional settings for querying historical data. History can only be retransmitted by Publishers that enable caching.
* Querying historical data is disabled if the value is ``NULL``.
* Settings for querying historical data. History can only be retransmitted by Publishers that enable caching.
*/
struct ze_advanced_subscriber_history_options_t *history;
struct ze_advanced_subscriber_history_options_t history;
/**
* Optional settings for retransmission of detected lost Samples. Retransmission of lost samples can only be done by Publishers that enable
* Settings for retransmission of detected lost Samples. Retransmission of lost samples can only be done by Publishers that enable
* caching and sample_miss_detection.
* Retransmission is disabled if the value is ``NULL``.
*/
struct ze_advanced_subscriber_recovery_options_t *recovery;
struct ze_advanced_subscriber_recovery_options_t recovery;
/**
* Timeout to be used for history and recovery queries.
* Default value will be used if set to ``0``.
Expand All @@ -1209,7 +1219,7 @@ typedef struct ze_advanced_subscriber_options_t {
* An optional key expression to be added to the liveliness token key expression.
* It can be used to convey meta data.
*/
struct z_loaned_keyexpr_t *subscriber_detection_metadata;
const struct z_loaned_keyexpr_t *subscriber_detection_metadata;
} ze_advanced_subscriber_options_t;
#endif
/**
Expand Down
21 changes: 14 additions & 7 deletions src/advanced_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ use crate::{
/// @brief Setting for advanced publisher's cache. The cache allows advanced subscribers to recover history and/or lost samples.
#[repr(C)]
pub struct ze_advanced_publisher_cache_options_t {
/// Number of samples to keep for each resource
/// Must be set to ``true``, to enable the cache.
pub is_enabled: bool,
/// Number of samples to keep for each resource.
pub max_samples: usize,
/// The congestion control to apply to replies.
pub congestion_control: z_congestion_control_t,
Expand All @@ -50,6 +52,7 @@ pub struct ze_advanced_publisher_cache_options_t {
impl Default for ze_advanced_publisher_cache_options_t {
fn default() -> Self {
Self {
is_enabled: true,
max_samples: 1,
congestion_control: CongestionControl::default().into(),
priority: Priority::default().into(),
Expand Down Expand Up @@ -86,8 +89,8 @@ impl From<&ze_advanced_publisher_cache_options_t> for CacheConfig {
pub struct ze_advanced_publisher_options_t {
/// Base publisher options.
pub publisher_options: z_publisher_options_t,
/// Optional settings for publisher cache.
pub cache: Option<&'static mut ze_advanced_publisher_cache_options_t>,
/// Publisher cache settings.
pub cache: ze_advanced_publisher_cache_options_t,
/// Allow matching Subscribers to detect lost samples and optionally ask for retransimission.
///
/// Retransmission can only be done if history is enabled on subscriber side.
Expand All @@ -96,17 +99,21 @@ pub struct ze_advanced_publisher_options_t {
pub publisher_detection: bool,
/// An optional key expression to be added to the liveliness token key expression.
/// It can be used to convey meta data.
pub publisher_detection_metadata: Option<&'static mut z_loaned_keyexpr_t>,
pub publisher_detection_metadata: Option<&'static z_loaned_keyexpr_t>,
}

/// Constructs the default value for `z_publisher_options_t`.
#[no_mangle]
pub extern "C" fn ze_advanced_publisher_options_default(
this_: &mut MaybeUninit<ze_advanced_publisher_options_t>,
) {
let cache = ze_advanced_publisher_cache_options_t {
is_enabled: false,
..Default::default()
};
this_.write(ze_advanced_publisher_options_t {
publisher_options: z_publisher_options_t::default(),
cache: None,
cache,
sample_miss_detection: false,
publisher_detection: false,
publisher_detection_metadata: None,
Expand Down Expand Up @@ -158,8 +165,8 @@ pub extern "C" fn ze_declare_advanced_publisher(
if let Some(pub_detection_metadata) = &options.publisher_detection_metadata {
p = p.publisher_detection_metadata(pub_detection_metadata.as_rust_type_ref());
}
if let Some(cache) = &options.cache {
p = p.cache((&**cache).into());
if options.cache.is_enabled {
p = p.cache((&options.cache).into());
}
}
match p.wait() {
Expand Down
58 changes: 43 additions & 15 deletions src/advanced_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ use crate::{
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief Settings for retrievieng historical data for Advanced Subscriber.
#[repr(C)]
#[derive(Default)]
pub struct ze_advanced_subscriber_history_options_t {
/// Must be set to ``true``, to enable the history data recovery.
pub is_enabled: bool,
/// Enable detection of late joiner publishers and query for their historical data.
/// Late joiner detection can only be achieved for Publishers that enable publisher_detection.
/// History can only be retransmitted by Publishers that enable caching.
Expand All @@ -43,6 +44,17 @@ pub struct ze_advanced_subscriber_history_options_t {
pub max_age_ms: u64,
}

impl Default for ze_advanced_subscriber_history_options_t {
fn default() -> Self {
Self {
is_enabled: true,
detect_late_publishers: false,
max_samples: 0,
max_age_ms: 0,
}
}
}

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief Constructs the default value for `ze_advanced_subscriber_history_options_t`.
#[no_mangle]
Expand Down Expand Up @@ -71,8 +83,9 @@ impl From<&ze_advanced_subscriber_history_options_t> for HistoryConfig {
/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
/// @brief Settings for recovering lost messages for Advanced Subscriber.
#[repr(C)]
#[derive(Default)]
pub struct ze_advanced_subscriber_recovery_options_t {
/// Must be set to ``true``, to enable the lost sample recovery.
pub is_enabled: bool,
/// Period for queries for not yet received Samples.
///
/// These queries allow to retrieve the last Sample(s) if the last Sample(s) is/are lost.
Expand All @@ -82,6 +95,15 @@ pub struct ze_advanced_subscriber_recovery_options_t {
pub periodic_queries_period_ms: u64,
}

impl Default for ze_advanced_subscriber_recovery_options_t {
fn default() -> Self {
Self {
is_enabled: true,
periodic_queries_period_ms: 0,
}
}
}

impl From<&ze_advanced_subscriber_recovery_options_t> for RecoveryConfig {
fn from(val: &ze_advanced_subscriber_recovery_options_t) -> RecoveryConfig {
let mut r = RecoveryConfig::default();
Expand All @@ -107,21 +129,19 @@ pub extern "C" fn ze_advanced_subscriber_recovery_options_default(
pub struct ze_advanced_subscriber_options_t {
/// Base subscriber options.
pub subscriber_options: z_subscriber_options_t,
/// Optional settings for querying historical data. History can only be retransmitted by Publishers that enable caching.
/// Querying historical data is disabled if the value is ``NULL``.
pub history: Option<&'static mut ze_advanced_subscriber_history_options_t>,
/// Optional settings for retransmission of detected lost Samples. Retransmission of lost samples can only be done by Publishers that enable
/// Settings for querying historical data. History can only be retransmitted by Publishers that enable caching.
pub history: ze_advanced_subscriber_history_options_t,
/// Settings for retransmission of detected lost Samples. Retransmission of lost samples can only be done by Publishers that enable
/// caching and sample_miss_detection.
/// Retransmission is disabled if the value is ``NULL``.
pub recovery: Option<&'static mut ze_advanced_subscriber_recovery_options_t>,
pub recovery: ze_advanced_subscriber_recovery_options_t,
/// Timeout to be used for history and recovery queries.
/// Default value will be used if set to ``0``.
pub query_timeout_ms: u64,
/// Allow this subscriber to be detected through liveliness.
pub subscriber_detection: bool,
/// An optional key expression to be added to the liveliness token key expression.
/// It can be used to convey meta data.
pub subscriber_detection_metadata: Option<&'static mut z_loaned_keyexpr_t>,
pub subscriber_detection_metadata: Option<&'static z_loaned_keyexpr_t>,
}

/// @warning This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
Expand All @@ -130,10 +150,18 @@ pub struct ze_advanced_subscriber_options_t {
pub extern "C" fn ze_advanced_subscriber_options_default(
this: &mut MaybeUninit<ze_advanced_subscriber_options_t>,
) {
let history = ze_advanced_subscriber_history_options_t {
is_enabled: false,
..Default::default()
};
let recovery = ze_advanced_subscriber_recovery_options_t {
is_enabled: false,
..Default::default()
};
this.write(ze_advanced_subscriber_options_t {
subscriber_options: z_subscriber_options_t::default(),
history: None,
recovery: None,
history,
recovery,
query_timeout_ms: 0,
subscriber_detection: false,
subscriber_detection_metadata: None,
Expand Down Expand Up @@ -163,11 +191,11 @@ fn _declare_advanced_subscriber_inner(
if let Some(sub_detection_metadata) = &options.subscriber_detection_metadata {
sub = sub.subscriber_detection_metadata(sub_detection_metadata.as_rust_type_ref());
}
if let Some(history) = &options.history {
sub = sub.history((&**history).into());
if options.history.is_enabled {
sub = sub.history((&options.history).into());
}
if let Some(recovery) = &options.recovery {
sub = sub.recovery((&**recovery).into());
if options.recovery.is_enabled {
sub = sub.recovery((&options.recovery).into());
}
}
sub
Expand Down
18 changes: 6 additions & 12 deletions tests/z_int_advanced_pub_sub_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,8 @@ int run_publisher() {

ze_advanced_publisher_options_t pub_opts;
ze_advanced_publisher_options_default(&pub_opts);
ze_advanced_publisher_cache_options_t cache_options;
ze_advanced_publisher_cache_options_default(&cache_options);
cache_options.max_samples = values_count;
pub_opts.cache = &cache_options;
ze_advanced_publisher_cache_options_default(&pub_opts.cache);
pub_opts.cache.max_samples = values_count;
pub_opts.publisher_detection = true;
pub_opts.sample_miss_detection = true;

Expand Down Expand Up @@ -141,15 +139,11 @@ int run_subscriber() {
ze_advanced_subscriber_options_t sub_opts;
ze_advanced_subscriber_options_default(&sub_opts);

ze_advanced_subscriber_history_options_t sub_history_options;
ze_advanced_subscriber_history_options_default(&sub_history_options);
sub_history_options.detect_late_publishers = true;
ze_advanced_subscriber_history_options_default(&sub_opts.history);
sub_opts.history.detect_late_publishers = true;

ze_advanced_subscriber_recovery_options_t sub_recovery_options;
ze_advanced_subscriber_recovery_options_default(&sub_recovery_options);
sub_recovery_options.periodic_queries_period_ms = 1000;
sub_opts.history = &sub_history_options;
sub_opts.recovery = &sub_recovery_options;
ze_advanced_subscriber_recovery_options_default(&sub_opts.recovery);
sub_opts.recovery.periodic_queries_period_ms = 1000;
sub_opts.subscriber_detection = true;

z_owned_closure_sample_t callback;
Expand Down
Loading