Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move reliability to publisher #653

Merged
merged 3 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
727 changes: 377 additions & 350 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ Functions
.. doxygenfunction:: z_sample_priority
.. doxygenfunction:: z_sample_congestion_control
.. doxygenfunction:: z_sample_express
.. doxygenfunction:: z_sample_reliability


Timestamp
Expand Down Expand Up @@ -495,6 +496,7 @@ Types

.. doxygenenum:: z_congestion_control_t
.. doxygenenum:: z_priority_t
.. doxygenenum:: z_reliability_t

.. doxygenstruct:: z_put_options_t
:members:
Expand Down Expand Up @@ -530,6 +532,8 @@ Functions
.. doxygenfunction:: z_publisher_put_options_default
.. doxygenfunction:: z_publisher_delete_options_default

.. doxygenfunction:: z_reliability_default

.. doxygenfunction:: zc_closure_matching_status_check
.. doxygenfunction:: zc_closure_matching_status_null
.. doxygenfunction:: zc_closure_matching_status_drop
Expand All @@ -550,8 +554,6 @@ Types
.. doxygenstruct:: z_owned_closure_sample_t
:members:

.. doxygenenum:: z_reliability_t

.. doxygenstruct:: z_subscriber_options_t
:members:

Expand Down
50 changes: 40 additions & 10 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ typedef enum z_query_target_t {
Z_QUERY_TARGET_ALL_COMPLETE,
} z_query_target_t;
/**
* The subscription reliability.
* The publisher reliability.
* NOTE: Currently `reliability` does not trigger any data retransmission on the wire.
* It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data).
*/
#if defined(UNSTABLE)
typedef enum z_reliability_t {
Expand Down Expand Up @@ -591,6 +593,12 @@ typedef struct z_publisher_options_t {
* If true, Zenoh will not wait to batch this message with others to reduce the bandwith.
*/
bool is_express;
#if defined(UNSTABLE)
/**
* The publisher reliability.
*/
enum z_reliability_t reliability;
#endif
#if defined(UNSTABLE)
/**
* The allowed destination for this publisher.
Expand All @@ -611,18 +619,10 @@ typedef struct z_queryable_options_t {
* Options passed to the `z_declare_subscriber()` function.
*/
typedef struct z_subscriber_options_t {
#if defined(UNSTABLE)
/**
* The subscription reliability.
*/
enum z_reliability_t reliability;
#endif
#if !defined(UNSTABLE)
/**
* Dummy field to avoid having fieldless struct
*/
uint8_t _0;
#endif
} z_subscriber_options_t;
/**
* Options passed to the `z_delete()` function.
Expand All @@ -644,6 +644,12 @@ typedef struct z_delete_options_t {
* The timestamp of this message.
*/
struct z_timestamp_t *timestamp;
#if defined(UNSTABLE)
/**
* The delete operation reliability.
*/
enum z_reliability_t reliability;
#endif
#if defined(UNSTABLE)
/**
* The allowed destination of this message.
Expand Down Expand Up @@ -794,6 +800,12 @@ typedef struct z_put_options_t {
* The timestamp of this message.
*/
struct z_timestamp_t *timestamp;
#if defined(UNSTABLE)
/**
* The put operation reliability.
*/
enum z_reliability_t reliability;
#endif
#if defined(UNSTABLE)
/**
* The allowed destination of this message.
Expand Down Expand Up @@ -1947,7 +1959,7 @@ z_result_t z_declare_subscriber(struct z_owned_subscriber_t *this_,
const struct z_loaned_session_t *session,
const struct z_loaned_keyexpr_t *key_expr,
struct z_moved_closure_sample_t *callback,
struct z_subscriber_options_t *options);
struct z_subscriber_options_t *_options);
/**
* Sends request to delete data on specified key expression (used when working with <a href="https://zenoh.io/docs/manual/abstractions/#storage"> Zenoh storages </a>).
*
Expand Down Expand Up @@ -2613,6 +2625,12 @@ ZENOHC_API enum z_whatami_t z_hello_whatami(const struct z_loaned_hello_t *this_
#if defined(UNSTABLE)
ZENOHC_API z_id_t z_hello_zid(const struct z_loaned_hello_t *this_);
#endif
/**
* Formats the `z_id_t` into 16-digit hex string (LSB-first order)
*/
#if defined(UNSTABLE)
ZENOHC_API void z_id_to_string(const z_id_t *zid, struct z_owned_string_t *dst);
#endif
/**
* Fetches the Zenoh IDs of all connected peers.
*
Expand Down Expand Up @@ -3504,6 +3522,12 @@ ZENOHC_API uint8_t z_random_u8(void);
#if (defined(SHARED_MEMORY) && defined(UNSTABLE))
ZENOHC_API void z_ref_shm_client_storage_global(z_owned_shm_client_storage_t *this_);
#endif
/**
* Returns the default value for `reliability`.
*/
#if defined(UNSTABLE)
ZENOHC_API enum z_reliability_t z_reliability_default(void);
#endif
/**
* Constructs an owned shallow copy of reply in provided uninitialized memory location.
*/
Expand Down Expand Up @@ -3706,6 +3730,12 @@ ZENOHC_API const struct z_loaned_bytes_t *z_sample_payload(const struct z_loaned
* Returns sample qos priority value.
*/
ZENOHC_API enum z_priority_t z_sample_priority(const struct z_loaned_sample_t *this_);
/**
* Returns the reliability setting the sample was delieverd with.
*/
#if defined(UNSTABLE)
ZENOHC_API enum z_reliability_t z_sample_reliability(const struct z_loaned_sample_t *this_);
#endif
/**
* Returns the sample source_info.
*/
Expand Down
26 changes: 13 additions & 13 deletions include/zenoh_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -657,43 +657,43 @@ inline void z_call(const z_loaned_closure_sample_t* closure, const z_loaned_samp
z_closure_sample_call(closure, sample);
};

extern "C" typedef void (*z_closure_drop_callback_t)(void*);
extern "C" using z_closure_drop_callback_t = void(void*);

extern "C" typedef void (*z_closure_hello_callback_t)(const z_loaned_hello_t*, void*);
extern "C" using z_closure_hello_callback_t = void(const z_loaned_hello_t*, void*);
inline void z_closure(
z_owned_closure_hello_t* closure,
z_closure_hello_callback_t call,
z_closure_drop_callback_t drop,
z_closure_hello_callback_t* call,
z_closure_drop_callback_t* drop,
void *context) {
closure->context = context;
closure->drop = drop;
closure->call = call;
};
extern "C" typedef void (*z_closure_query_callback_t)(const z_loaned_query_t*, void*);
extern "C" using z_closure_query_callback_t = void(const z_loaned_query_t*, void*);
inline void z_closure(
z_owned_closure_query_t* closure,
z_closure_query_callback_t call,
z_closure_drop_callback_t drop,
z_closure_query_callback_t* call,
z_closure_drop_callback_t* drop,
void *context) {
closure->context = context;
closure->drop = drop;
closure->call = call;
};
extern "C" typedef void (*z_closure_reply_callback_t)(const z_loaned_reply_t*, void*);
extern "C" using z_closure_reply_callback_t = void(const z_loaned_reply_t*, void*);
inline void z_closure(
z_owned_closure_reply_t* closure,
z_closure_reply_callback_t call,
z_closure_drop_callback_t drop,
z_closure_reply_callback_t* call,
z_closure_drop_callback_t* drop,
void *context) {
closure->context = context;
closure->drop = drop;
closure->call = call;
};
extern "C" typedef void (*z_closure_sample_callback_t)(const z_loaned_sample_t*, void*);
extern "C" using z_closure_sample_callback_t = void(const z_loaned_sample_t*, void*);
inline void z_closure(
z_owned_closure_sample_t* closure,
z_closure_sample_callback_t call,
z_closure_drop_callback_t drop,
z_closure_sample_callback_t* call,
z_closure_drop_callback_t* drop,
void *context) {
closure->context = context;
closure->drop = drop;
Expand Down
63 changes: 57 additions & 6 deletions src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
use std::{mem::MaybeUninit, ptr::null};

use libc::c_ulong;
use zenoh::{
qos::{CongestionControl, Priority},
query::{ConsolidationMode, QueryTarget},
sample::{Sample, SampleKind},
time::Timestamp,
};
#[cfg(feature = "unstable")]
use zenoh::{
pubsub::Reliability,
query::ReplyKeyExpr,
sample::{Locality, SourceInfo},
session::EntityGlobalId,
};
use zenoh::{
qos::{CongestionControl, Priority},
query::{ConsolidationMode, QueryTarget},
sample::{Sample, SampleKind},
time::Timestamp,
};

#[cfg(feature = "unstable")]
use crate::transmute::IntoCType;
Expand Down Expand Up @@ -186,6 +187,13 @@ pub extern "C" fn z_sample_congestion_control(this_: &z_loaned_sample_t) -> z_co
this_.as_rust_type_ref().congestion_control().into()
}

#[cfg(feature = "unstable")]
/// Returns the reliability setting the sample was delieverd with.
#[no_mangle]
pub extern "C" fn z_sample_reliability(this_: &z_loaned_sample_t) -> z_reliability_t {
this_.as_rust_type_ref().reliability().into()
}

/// Returns ``true`` if sample is valid, ``false`` if it is in gravestone state.
#[no_mangle]
pub extern "C" fn z_internal_sample_check(this_: &z_owned_sample_t) -> bool {
Expand Down Expand Up @@ -256,6 +264,49 @@ pub extern "C" fn zc_locality_default() -> zc_locality_t {
Locality::default().into()
}

/// The publisher reliability.
/// NOTE: Currently `reliability` does not trigger any data retransmission on the wire.
/// It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data).
#[cfg(feature = "unstable")]
#[allow(non_camel_case_types, clippy::upper_case_acronyms)]
#[repr(C)]
#[derive(Clone, Copy)]
pub enum z_reliability_t {
/// Defines reliability as ``BEST_EFFORT``
BEST_EFFORT,
/// Defines reliability as ``RELIABLE``
RELIABLE,
}

#[cfg(feature = "unstable")]
impl From<Reliability> for z_reliability_t {
#[inline]
fn from(r: Reliability) -> Self {
match r {
Reliability::BestEffort => z_reliability_t::BEST_EFFORT,
Reliability::Reliable => z_reliability_t::RELIABLE,
}
}
}

#[cfg(feature = "unstable")]
/// Returns the default value for `reliability`.
#[no_mangle]
pub extern "C" fn z_reliability_default() -> z_reliability_t {
Reliability::default().into()
}

#[cfg(feature = "unstable")]
impl From<z_reliability_t> for Reliability {
#[inline]
fn from(val: z_reliability_t) -> Self {
match val {
z_reliability_t::BEST_EFFORT => Reliability::BestEffort,
z_reliability_t::RELIABLE => Reliability::Reliable,
}
}
}

#[cfg(feature = "unstable")]
/// Key expressions types to which Queryable should reply to.
#[repr(C)]
Expand Down
14 changes: 11 additions & 3 deletions src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ use crate::{
};
#[cfg(feature = "unstable")]
use crate::{
transmute::IntoCType, z_entity_global_id_t, zc_closure_matching_status_call,
zc_closure_matching_status_loan, zc_locality_default, zc_locality_t,
transmute::IntoCType, z_entity_global_id_t, z_reliability_default, z_reliability_t,
zc_closure_matching_status_call, zc_closure_matching_status_loan, zc_locality_default,
zc_locality_t,
};

/// Options passed to the `z_declare_publisher()` function.
Expand All @@ -49,6 +50,9 @@ pub struct z_publisher_options_t {
pub priority: z_priority_t,
/// If true, Zenoh will not wait to batch this message with others to reduce the bandwith.
pub is_express: bool,
/// The publisher reliability.
#[cfg(feature = "unstable")]
pub reliability: z_reliability_t,
#[cfg(feature = "unstable")]
/// The allowed destination for this publisher.
pub allowed_destination: zc_locality_t,
Expand All @@ -63,6 +67,8 @@ pub extern "C" fn z_publisher_options_default(this_: &mut MaybeUninit<z_publishe
priority: Priority::default().into(),
is_express: false,
#[cfg(feature = "unstable")]
reliability: z_reliability_default(),
#[cfg(feature = "unstable")]
allowed_destination: zc_locality_default(),
});
}
Expand Down Expand Up @@ -103,7 +109,9 @@ pub extern "C" fn z_declare_publisher(
.express(options.is_express);
#[cfg(feature = "unstable")]
{
p = p.allowed_destination(options.allowed_destination.into());
p = p
.reliability(options.reliability.into())
.allowed_destination(options.allowed_destination.into());
}
if let Some(encoding) = options.encoding.take() {
p = p.encoding(encoding.take_rust_type());
Expand Down
Loading