Skip to content

Commit

Permalink
Merge branch 'main' into add-z-bytes-slice-iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 authored Sep 6, 2024
2 parents 1a15faf + 390d102 commit 5b208d7
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 87 deletions.
6 changes: 4 additions & 2 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ Functions
.. doxygenfunction:: z_sample_priority
.. doxygenfunction:: z_sample_congestion_control
.. doxygenfunction:: z_sample_express
.. doxygenfunction:: z_sample_reliability


Timestamp
Expand Down Expand Up @@ -495,6 +496,7 @@ Types

.. doxygenenum:: z_congestion_control_t
.. doxygenenum:: z_priority_t
.. doxygenenum:: z_reliability_t

.. doxygenstruct:: z_put_options_t
:members:
Expand Down Expand Up @@ -530,6 +532,8 @@ Functions
.. doxygenfunction:: z_publisher_put_options_default
.. doxygenfunction:: z_publisher_delete_options_default

.. doxygenfunction:: z_reliability_default

.. doxygenfunction:: zc_closure_matching_status_check
.. doxygenfunction:: zc_closure_matching_status_null
.. doxygenfunction:: zc_closure_matching_status_drop
Expand All @@ -550,8 +554,6 @@ Types
.. doxygenstruct:: z_owned_closure_sample_t
:members:

.. doxygenenum:: z_reliability_t

.. doxygenstruct:: z_subscriber_options_t
:members:

Expand Down
44 changes: 34 additions & 10 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ typedef enum z_query_target_t {
Z_QUERY_TARGET_ALL_COMPLETE,
} z_query_target_t;
/**
* The subscription reliability.
* The publisher reliability.
* NOTE: Currently `reliability` does not trigger any data retransmission on the wire.
* It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data).
*/
#if defined(UNSTABLE)
typedef enum z_reliability_t {
Expand Down Expand Up @@ -597,6 +599,12 @@ typedef struct z_publisher_options_t {
* If true, Zenoh will not wait to batch this message with others to reduce the bandwith.
*/
bool is_express;
#if defined(UNSTABLE)
/**
* The publisher reliability.
*/
enum z_reliability_t reliability;
#endif
#if defined(UNSTABLE)
/**
* The allowed destination for this publisher.
Expand All @@ -617,18 +625,10 @@ typedef struct z_queryable_options_t {
* Options passed to the `z_declare_subscriber()` function.
*/
typedef struct z_subscriber_options_t {
#if defined(UNSTABLE)
/**
* The subscription reliability.
*/
enum z_reliability_t reliability;
#endif
#if !defined(UNSTABLE)
/**
* Dummy field to avoid having fieldless struct
*/
uint8_t _0;
#endif
} z_subscriber_options_t;
/**
* Options passed to the `z_delete()` function.
Expand All @@ -650,6 +650,12 @@ typedef struct z_delete_options_t {
* The timestamp of this message.
*/
struct z_timestamp_t *timestamp;
#if defined(UNSTABLE)
/**
* The delete operation reliability.
*/
enum z_reliability_t reliability;
#endif
#if defined(UNSTABLE)
/**
* The allowed destination of this message.
Expand Down Expand Up @@ -800,6 +806,12 @@ typedef struct z_put_options_t {
* The timestamp of this message.
*/
struct z_timestamp_t *timestamp;
#if defined(UNSTABLE)
/**
* The put operation reliability.
*/
enum z_reliability_t reliability;
#endif
#if defined(UNSTABLE)
/**
* The allowed destination of this message.
Expand Down Expand Up @@ -1972,7 +1984,7 @@ z_result_t z_declare_subscriber(struct z_owned_subscriber_t *this_,
const struct z_loaned_session_t *session,
const struct z_loaned_keyexpr_t *key_expr,
struct z_moved_closure_sample_t *callback,
struct z_subscriber_options_t *options);
struct z_subscriber_options_t *_options);
/**
* Sends request to delete data on specified key expression (used when working with <a href="https://zenoh.io/docs/manual/abstractions/#storage"> Zenoh storages </a>).
*
Expand Down Expand Up @@ -3535,6 +3547,12 @@ ZENOHC_API uint8_t z_random_u8(void);
#if (defined(SHARED_MEMORY) && defined(UNSTABLE))
ZENOHC_API void z_ref_shm_client_storage_global(z_owned_shm_client_storage_t *this_);
#endif
/**
* Returns the default value for `reliability`.
*/
#if defined(UNSTABLE)
ZENOHC_API enum z_reliability_t z_reliability_default(void);
#endif
/**
* Constructs an owned shallow copy of reply in provided uninitialized memory location.
*/
Expand Down Expand Up @@ -3737,6 +3755,12 @@ ZENOHC_API const struct z_loaned_bytes_t *z_sample_payload(const struct z_loaned
* Returns sample qos priority value.
*/
ZENOHC_API enum z_priority_t z_sample_priority(const struct z_loaned_sample_t *this_);
/**
* Returns the reliability setting the sample was delieverd with.
*/
#if defined(UNSTABLE)
ZENOHC_API enum z_reliability_t z_sample_reliability(const struct z_loaned_sample_t *this_);
#endif
/**
* Returns the sample source_info.
*/
Expand Down
63 changes: 57 additions & 6 deletions src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
use std::{mem::MaybeUninit, ptr::null};

use libc::c_ulong;
use zenoh::{
qos::{CongestionControl, Priority},
query::{ConsolidationMode, QueryTarget},
sample::{Sample, SampleKind},
time::Timestamp,
};
#[cfg(feature = "unstable")]
use zenoh::{
pubsub::Reliability,
query::ReplyKeyExpr,
sample::{Locality, SourceInfo},
session::EntityGlobalId,
};
use zenoh::{
qos::{CongestionControl, Priority},
query::{ConsolidationMode, QueryTarget},
sample::{Sample, SampleKind},
time::Timestamp,
};

#[cfg(feature = "unstable")]
use crate::transmute::IntoCType;
Expand Down Expand Up @@ -186,6 +187,13 @@ pub extern "C" fn z_sample_congestion_control(this_: &z_loaned_sample_t) -> z_co
this_.as_rust_type_ref().congestion_control().into()
}

#[cfg(feature = "unstable")]
/// Returns the reliability setting the sample was delieverd with.
#[no_mangle]
pub extern "C" fn z_sample_reliability(this_: &z_loaned_sample_t) -> z_reliability_t {
this_.as_rust_type_ref().reliability().into()
}

/// Returns ``true`` if sample is valid, ``false`` if it is in gravestone state.
#[no_mangle]
pub extern "C" fn z_internal_sample_check(this_: &z_owned_sample_t) -> bool {
Expand Down Expand Up @@ -256,6 +264,49 @@ pub extern "C" fn zc_locality_default() -> zc_locality_t {
Locality::default().into()
}

/// The publisher reliability.
/// NOTE: Currently `reliability` does not trigger any data retransmission on the wire.
/// It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data).
#[cfg(feature = "unstable")]
#[allow(non_camel_case_types, clippy::upper_case_acronyms)]
#[repr(C)]
#[derive(Clone, Copy)]
pub enum z_reliability_t {
/// Defines reliability as ``BEST_EFFORT``
BEST_EFFORT,
/// Defines reliability as ``RELIABLE``
RELIABLE,
}

#[cfg(feature = "unstable")]
impl From<Reliability> for z_reliability_t {
#[inline]
fn from(r: Reliability) -> Self {
match r {
Reliability::BestEffort => z_reliability_t::BEST_EFFORT,
Reliability::Reliable => z_reliability_t::RELIABLE,
}
}
}

#[cfg(feature = "unstable")]
/// Returns the default value for `reliability`.
#[no_mangle]
pub extern "C" fn z_reliability_default() -> z_reliability_t {
Reliability::default().into()
}

#[cfg(feature = "unstable")]
impl From<z_reliability_t> for Reliability {
#[inline]
fn from(val: z_reliability_t) -> Self {
match val {
z_reliability_t::BEST_EFFORT => Reliability::BestEffort,
z_reliability_t::RELIABLE => Reliability::Reliable,
}
}
}

#[cfg(feature = "unstable")]
/// Key expressions types to which Queryable should reply to.
#[repr(C)]
Expand Down
14 changes: 11 additions & 3 deletions src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ use crate::{
};
#[cfg(feature = "unstable")]
use crate::{
transmute::IntoCType, z_entity_global_id_t, zc_closure_matching_status_call,
zc_closure_matching_status_loan, zc_locality_default, zc_locality_t,
transmute::IntoCType, z_entity_global_id_t, z_reliability_default, z_reliability_t,
zc_closure_matching_status_call, zc_closure_matching_status_loan, zc_locality_default,
zc_locality_t,
};

/// Options passed to the `z_declare_publisher()` function.
Expand All @@ -49,6 +50,9 @@ pub struct z_publisher_options_t {
pub priority: z_priority_t,
/// If true, Zenoh will not wait to batch this message with others to reduce the bandwith.
pub is_express: bool,
/// The publisher reliability.
#[cfg(feature = "unstable")]
pub reliability: z_reliability_t,
#[cfg(feature = "unstable")]
/// The allowed destination for this publisher.
pub allowed_destination: zc_locality_t,
Expand All @@ -63,6 +67,8 @@ pub extern "C" fn z_publisher_options_default(this_: &mut MaybeUninit<z_publishe
priority: Priority::default().into(),
is_express: false,
#[cfg(feature = "unstable")]
reliability: z_reliability_default(),
#[cfg(feature = "unstable")]
allowed_destination: zc_locality_default(),
});
}
Expand Down Expand Up @@ -103,7 +109,9 @@ pub extern "C" fn z_declare_publisher(
.express(options.is_express);
#[cfg(feature = "unstable")]
{
p = p.allowed_destination(options.allowed_destination.into());
p = p
.reliability(options.reliability.into())
.allowed_destination(options.allowed_destination.into());
}
if let Some(encoding) = options.encoding.take() {
p = p.encoding(encoding.take_rust_type());
Expand Down
32 changes: 23 additions & 9 deletions src/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub struct z_put_options_t {
pub is_express: bool,
/// The timestamp of this message.
pub timestamp: Option<&'static mut z_timestamp_t>,
/// The put operation reliability.
#[cfg(feature = "unstable")]
reliability: z_reliability_t,
/// The allowed destination of this message.
#[cfg(feature = "unstable")]
pub allowed_destination: zc_locality_t,
Expand All @@ -64,6 +67,8 @@ pub extern "C" fn z_put_options_default(this_: &mut MaybeUninit<z_put_options_t>
is_express: false,
timestamp: None,
#[cfg(feature = "unstable")]
reliability: z_reliability_default(),
#[cfg(feature = "unstable")]
allowed_destination: zc_locality_default(),
#[cfg(feature = "unstable")]
source_info: None,
Expand Down Expand Up @@ -95,22 +100,24 @@ pub extern "C" fn z_put(
if let Some(encoding) = options.encoding.take() {
put = put.encoding(encoding.take_rust_type());
};
#[cfg(feature = "unstable")]
if let Some(source_info) = options.source_info.take() {
put = put.source_info(source_info.take_rust_type());
};
if let Some(attachment) = options.attachment.take() {
put = put.attachment(attachment.take_rust_type());
}
if let Some(timestamp) = options.timestamp.as_ref() {
put = put.timestamp(Some(timestamp.into_rust_type()));
}
put = put.priority(options.priority.into());
put = put.congestion_control(options.congestion_control.into());
put = put.express(options.is_express);
put = put
.priority(options.priority.into())
.congestion_control(options.congestion_control.into())
.express(options.is_express);
#[cfg(feature = "unstable")]
{
put = put.allowed_destination(options.allowed_destination.into());
put = put
.reliability(options.reliability.into())
.allowed_destination(options.allowed_destination.into());
if let Some(source_info) = options.source_info.take() {
put = put.source_info(source_info.take_rust_type());
};
}
}

Expand All @@ -134,6 +141,9 @@ pub struct z_delete_options_t {
pub is_express: bool,
/// The timestamp of this message.
pub timestamp: Option<&'static mut z_timestamp_t>,
/// The delete operation reliability.
#[cfg(feature = "unstable")]
pub reliability: z_reliability_t,
/// The allowed destination of this message.
#[cfg(feature = "unstable")]
pub allowed_destination: zc_locality_t,
Expand All @@ -149,6 +159,8 @@ pub unsafe extern "C" fn z_delete_options_default(this_: &mut MaybeUninit<z_dele
is_express: false,
timestamp: None,
#[cfg(feature = "unstable")]
reliability: z_reliability_default(),
#[cfg(feature = "unstable")]
allowed_destination: zc_locality_default(),
});
}
Expand Down Expand Up @@ -181,7 +193,9 @@ pub extern "C" fn z_delete(

#[cfg(feature = "unstable")]
{
del = del.allowed_destination(options.allowed_destination.into());
del = del
.reliability(options.reliability.into())
.allowed_destination(options.allowed_destination.into());
}
}

Expand Down
Loading

0 comments on commit 5b208d7

Please sign in to comment.