diff --git a/docs/api.rst b/docs/api.rst index 195cc798f..197b41f0d 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -327,6 +327,7 @@ Functions .. doxygenfunction:: z_sample_priority .. doxygenfunction:: z_sample_congestion_control .. doxygenfunction:: z_sample_express +.. doxygenfunction:: z_sample_reliability Timestamp @@ -495,6 +496,7 @@ Types .. doxygenenum:: z_congestion_control_t .. doxygenenum:: z_priority_t +.. doxygenenum:: z_reliability_t .. doxygenstruct:: z_put_options_t :members: @@ -530,6 +532,8 @@ Functions .. doxygenfunction:: z_publisher_put_options_default .. doxygenfunction:: z_publisher_delete_options_default +.. doxygenfunction:: z_reliability_default + .. doxygenfunction:: zc_closure_matching_status_check .. doxygenfunction:: zc_closure_matching_status_null .. doxygenfunction:: zc_closure_matching_status_drop @@ -550,8 +554,6 @@ Types .. doxygenstruct:: z_owned_closure_sample_t :members: -.. doxygenenum:: z_reliability_t - .. doxygenstruct:: z_subscriber_options_t :members: diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 2bf5938f0..17d780e06 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -163,7 +163,9 @@ typedef enum z_query_target_t { Z_QUERY_TARGET_ALL_COMPLETE, } z_query_target_t; /** - * The subscription reliability. + * The publisher reliability. + * NOTE: Currently `reliability` does not trigger any data retransmission on the wire. + * It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data). */ #if defined(UNSTABLE) typedef enum z_reliability_t { @@ -597,6 +599,12 @@ typedef struct z_publisher_options_t { * If true, Zenoh will not wait to batch this message with others to reduce the bandwith. */ bool is_express; +#if defined(UNSTABLE) + /** + * The publisher reliability. + */ + enum z_reliability_t reliability; +#endif #if defined(UNSTABLE) /** * The allowed destination for this publisher. @@ -617,18 +625,10 @@ typedef struct z_queryable_options_t { * Options passed to the `z_declare_subscriber()` function. */ typedef struct z_subscriber_options_t { -#if defined(UNSTABLE) - /** - * The subscription reliability. - */ - enum z_reliability_t reliability; -#endif -#if !defined(UNSTABLE) /** * Dummy field to avoid having fieldless struct */ uint8_t _0; -#endif } z_subscriber_options_t; /** * Options passed to the `z_delete()` function. @@ -650,6 +650,12 @@ typedef struct z_delete_options_t { * The timestamp of this message. */ struct z_timestamp_t *timestamp; +#if defined(UNSTABLE) + /** + * The delete operation reliability. + */ + enum z_reliability_t reliability; +#endif #if defined(UNSTABLE) /** * The allowed destination of this message. @@ -800,6 +806,12 @@ typedef struct z_put_options_t { * The timestamp of this message. */ struct z_timestamp_t *timestamp; +#if defined(UNSTABLE) + /** + * The put operation reliability. + */ + enum z_reliability_t reliability; +#endif #if defined(UNSTABLE) /** * The allowed destination of this message. @@ -1972,7 +1984,7 @@ z_result_t z_declare_subscriber(struct z_owned_subscriber_t *this_, const struct z_loaned_session_t *session, const struct z_loaned_keyexpr_t *key_expr, struct z_moved_closure_sample_t *callback, - struct z_subscriber_options_t *options); + struct z_subscriber_options_t *_options); /** * Sends request to delete data on specified key expression (used when working with Zenoh storages ). * @@ -3535,6 +3547,12 @@ ZENOHC_API uint8_t z_random_u8(void); #if (defined(SHARED_MEMORY) && defined(UNSTABLE)) ZENOHC_API void z_ref_shm_client_storage_global(z_owned_shm_client_storage_t *this_); #endif +/** + * Returns the default value for `reliability`. + */ +#if defined(UNSTABLE) +ZENOHC_API enum z_reliability_t z_reliability_default(void); +#endif /** * Constructs an owned shallow copy of reply in provided uninitialized memory location. */ @@ -3737,6 +3755,12 @@ ZENOHC_API const struct z_loaned_bytes_t *z_sample_payload(const struct z_loaned * Returns sample qos priority value. */ ZENOHC_API enum z_priority_t z_sample_priority(const struct z_loaned_sample_t *this_); +/** + * Returns the reliability setting the sample was delieverd with. + */ +#if defined(UNSTABLE) +ZENOHC_API enum z_reliability_t z_sample_reliability(const struct z_loaned_sample_t *this_); +#endif /** * Returns the sample source_info. */ diff --git a/src/commons.rs b/src/commons.rs index db3aebbb7..a148ad7b1 100644 --- a/src/commons.rs +++ b/src/commons.rs @@ -15,18 +15,19 @@ use std::{mem::MaybeUninit, ptr::null}; use libc::c_ulong; -use zenoh::{ - qos::{CongestionControl, Priority}, - query::{ConsolidationMode, QueryTarget}, - sample::{Sample, SampleKind}, - time::Timestamp, -}; #[cfg(feature = "unstable")] use zenoh::{ + pubsub::Reliability, query::ReplyKeyExpr, sample::{Locality, SourceInfo}, session::EntityGlobalId, }; +use zenoh::{ + qos::{CongestionControl, Priority}, + query::{ConsolidationMode, QueryTarget}, + sample::{Sample, SampleKind}, + time::Timestamp, +}; #[cfg(feature = "unstable")] use crate::transmute::IntoCType; @@ -186,6 +187,13 @@ pub extern "C" fn z_sample_congestion_control(this_: &z_loaned_sample_t) -> z_co this_.as_rust_type_ref().congestion_control().into() } +#[cfg(feature = "unstable")] +/// Returns the reliability setting the sample was delieverd with. +#[no_mangle] +pub extern "C" fn z_sample_reliability(this_: &z_loaned_sample_t) -> z_reliability_t { + this_.as_rust_type_ref().reliability().into() +} + /// Returns ``true`` if sample is valid, ``false`` if it is in gravestone state. #[no_mangle] pub extern "C" fn z_internal_sample_check(this_: &z_owned_sample_t) -> bool { @@ -256,6 +264,49 @@ pub extern "C" fn zc_locality_default() -> zc_locality_t { Locality::default().into() } +/// The publisher reliability. +/// NOTE: Currently `reliability` does not trigger any data retransmission on the wire. +/// It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data). +#[cfg(feature = "unstable")] +#[allow(non_camel_case_types, clippy::upper_case_acronyms)] +#[repr(C)] +#[derive(Clone, Copy)] +pub enum z_reliability_t { + /// Defines reliability as ``BEST_EFFORT`` + BEST_EFFORT, + /// Defines reliability as ``RELIABLE`` + RELIABLE, +} + +#[cfg(feature = "unstable")] +impl From for z_reliability_t { + #[inline] + fn from(r: Reliability) -> Self { + match r { + Reliability::BestEffort => z_reliability_t::BEST_EFFORT, + Reliability::Reliable => z_reliability_t::RELIABLE, + } + } +} + +#[cfg(feature = "unstable")] +/// Returns the default value for `reliability`. +#[no_mangle] +pub extern "C" fn z_reliability_default() -> z_reliability_t { + Reliability::default().into() +} + +#[cfg(feature = "unstable")] +impl From for Reliability { + #[inline] + fn from(val: z_reliability_t) -> Self { + match val { + z_reliability_t::BEST_EFFORT => Reliability::BestEffort, + z_reliability_t::RELIABLE => Reliability::Reliable, + } + } +} + #[cfg(feature = "unstable")] /// Key expressions types to which Queryable should reply to. #[repr(C)] diff --git a/src/publisher.rs b/src/publisher.rs index a1a9dca89..7f2af457b 100644 --- a/src/publisher.rs +++ b/src/publisher.rs @@ -34,8 +34,9 @@ use crate::{ }; #[cfg(feature = "unstable")] use crate::{ - transmute::IntoCType, z_entity_global_id_t, zc_closure_matching_status_call, - zc_closure_matching_status_loan, zc_locality_default, zc_locality_t, + transmute::IntoCType, z_entity_global_id_t, z_reliability_default, z_reliability_t, + zc_closure_matching_status_call, zc_closure_matching_status_loan, zc_locality_default, + zc_locality_t, }; /// Options passed to the `z_declare_publisher()` function. @@ -49,6 +50,9 @@ pub struct z_publisher_options_t { pub priority: z_priority_t, /// If true, Zenoh will not wait to batch this message with others to reduce the bandwith. pub is_express: bool, + /// The publisher reliability. + #[cfg(feature = "unstable")] + pub reliability: z_reliability_t, #[cfg(feature = "unstable")] /// The allowed destination for this publisher. pub allowed_destination: zc_locality_t, @@ -63,6 +67,8 @@ pub extern "C" fn z_publisher_options_default(this_: &mut MaybeUninit, + /// The put operation reliability. + #[cfg(feature = "unstable")] + reliability: z_reliability_t, /// The allowed destination of this message. #[cfg(feature = "unstable")] pub allowed_destination: zc_locality_t, @@ -64,6 +67,8 @@ pub extern "C" fn z_put_options_default(this_: &mut MaybeUninit is_express: false, timestamp: None, #[cfg(feature = "unstable")] + reliability: z_reliability_default(), + #[cfg(feature = "unstable")] allowed_destination: zc_locality_default(), #[cfg(feature = "unstable")] source_info: None, @@ -95,22 +100,24 @@ pub extern "C" fn z_put( if let Some(encoding) = options.encoding.take() { put = put.encoding(encoding.take_rust_type()); }; - #[cfg(feature = "unstable")] - if let Some(source_info) = options.source_info.take() { - put = put.source_info(source_info.take_rust_type()); - }; if let Some(attachment) = options.attachment.take() { put = put.attachment(attachment.take_rust_type()); } if let Some(timestamp) = options.timestamp.as_ref() { put = put.timestamp(Some(timestamp.into_rust_type())); } - put = put.priority(options.priority.into()); - put = put.congestion_control(options.congestion_control.into()); - put = put.express(options.is_express); + put = put + .priority(options.priority.into()) + .congestion_control(options.congestion_control.into()) + .express(options.is_express); #[cfg(feature = "unstable")] { - put = put.allowed_destination(options.allowed_destination.into()); + put = put + .reliability(options.reliability.into()) + .allowed_destination(options.allowed_destination.into()); + if let Some(source_info) = options.source_info.take() { + put = put.source_info(source_info.take_rust_type()); + }; } } @@ -134,6 +141,9 @@ pub struct z_delete_options_t { pub is_express: bool, /// The timestamp of this message. pub timestamp: Option<&'static mut z_timestamp_t>, + /// The delete operation reliability. + #[cfg(feature = "unstable")] + pub reliability: z_reliability_t, /// The allowed destination of this message. #[cfg(feature = "unstable")] pub allowed_destination: zc_locality_t, @@ -149,6 +159,8 @@ pub unsafe extern "C" fn z_delete_options_default(this_: &mut MaybeUninit for z_reliability_t { - #[inline] - fn from(r: Reliability) -> Self { - match r { - Reliability::BestEffort => z_reliability_t::BEST_EFFORT, - Reliability::Reliable => z_reliability_t::RELIABLE, - } - } -} - -#[cfg(feature = "unstable")] -impl From for Reliability { - #[inline] - fn from(val: z_reliability_t) -> Self { - match val { - z_reliability_t::BEST_EFFORT => Reliability::BestEffort, - z_reliability_t::RELIABLE => Reliability::Reliable, - } - } -} - -pub use crate::opaque_types::{z_loaned_subscriber_t, z_moved_subscriber_t, z_owned_subscriber_t}; decl_c_type!( owned(z_owned_subscriber_t, option Subscriber<'static, ()>), loaned(z_loaned_subscriber_t), @@ -86,23 +49,14 @@ pub unsafe extern "C" fn z_subscriber_loan(this_: &z_owned_subscriber_t) -> &z_l #[allow(non_camel_case_types)] #[repr(C)] pub struct z_subscriber_options_t { - /// The subscription reliability. - #[cfg(feature = "unstable")] - pub reliability: z_reliability_t, /// Dummy field to avoid having fieldless struct - #[cfg(not(feature = "unstable"))] pub _0: u8, } /// Constructs the default value for `z_subscriber_options_t`. #[no_mangle] pub extern "C" fn z_subscriber_options_default(this_: &mut MaybeUninit) { - this_.write(z_subscriber_options_t { - #[cfg(feature = "unstable")] - reliability: Reliability::DEFAULT.into(), - #[cfg(not(feature = "unstable"))] - _0: 0, - }); + this_.write(z_subscriber_options_t { _0: 0 }); } /// Constructs and declares a subscriber for a given key expression. Dropping subscriber @@ -121,7 +75,7 @@ pub extern "C" fn z_declare_subscriber( session: &z_loaned_session_t, key_expr: &z_loaned_keyexpr_t, callback: &mut z_moved_closure_sample_t, - options: Option<&mut z_subscriber_options_t>, + _options: Option<&mut z_subscriber_options_t>, ) -> result::z_result_t { let this = this.as_rust_type_mut_uninit(); let session = session.as_rust_type_ref(); @@ -133,14 +87,6 @@ pub extern "C" fn z_declare_subscriber( let sample = sample.as_loaned_c_type_ref(); z_closure_sample_call(z_closure_sample_loan(&callback), sample) }); - #[cfg(not(feature = "unstable"))] - let _ = options; - #[cfg(feature = "unstable")] - let mut subscriber = subscriber; - #[cfg(feature = "unstable")] - if let Some(options) = options { - subscriber = subscriber.reliability(options.reliability.into()); - } match subscriber.wait() { Ok(sub) => { this.write(Some(sub));