Skip to content

Commit

Permalink
introduce owned buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
p-avital committed Feb 29, 2024
1 parent 5d7bb83 commit a52ef72
Show file tree
Hide file tree
Showing 16 changed files with 144 additions and 167 deletions.
19 changes: 17 additions & 2 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ typedef enum zcu_reply_keyexpr_t {
* and empty slices are represented using a possibly dangling pointer for `start`.
*/
typedef struct z_bytes_t {
size_t len;
const uint8_t *start;
size_t len;
} z_bytes_t;
/**
* The body of a loop over an attachment's key-value pairs.
Expand Down Expand Up @@ -189,6 +189,15 @@ typedef struct z_attachment_t {
const void *data;
z_attachment_iter_driver_t iteration_driver;
} z_attachment_t;
/**
* A buffer owned by Zenoh.
*/
typedef struct z_owned_buffer_t {
size_t _inner[5];
} z_owned_buffer_t;
typedef struct z_buffer_t {
size_t _inner;
} z_buffer_t;
/**
* A map of maybe-owned vector of bytes to owned vector of bytes.
*
Expand Down Expand Up @@ -862,7 +871,7 @@ typedef struct zc_owned_liveliness_get_options_t {
*/
typedef struct zc_owned_payload_t {
struct z_bytes_t payload;
size_t _owner[5];
struct z_owned_buffer_t _owner;
} zc_owned_payload_t;
typedef struct zc_owned_shmbuf_t {
size_t _0[9];
Expand Down Expand Up @@ -1027,6 +1036,12 @@ int8_t z_attachment_iterate(struct z_attachment_t this_,
* Returns the gravestone value for `z_attachment_t`.
*/
ZENOHC_API struct z_attachment_t z_attachment_null(void);
ZENOHC_API bool z_buffer_check(const struct z_owned_buffer_t *buffer);
ZENOHC_API struct z_owned_buffer_t z_buffer_clone(struct z_buffer_t buffer);
ZENOHC_API void z_buffer_drop(struct z_owned_buffer_t *buffer);
ZENOHC_API struct z_buffer_t z_buffer_loan(const struct z_owned_buffer_t *buffer);
ZENOHC_API struct z_owned_buffer_t z_buffer_null(void);
ZENOHC_API struct z_bytes_t z_buffer_payload(struct z_buffer_t buffer);
/**
* Returns ``true`` if `b` is initialized.
*/
Expand Down
10 changes: 10 additions & 0 deletions include/zenoh_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
z_owned_hello_t : z_hello_loan, \
z_owned_str_t : z_str_loan, \
z_owned_query_t : z_query_loan, \
z_owned_buffer_t: z_buffer_loan, \
ze_owned_querying_subscriber_t : ze_querying_subscriber_loan \
)(&x)

Expand Down Expand Up @@ -42,6 +43,7 @@
z_owned_reply_channel_t * : z_reply_channel_drop, \
z_owned_query_channel_t * : z_query_channel_drop, \
z_owned_bytes_map_t * : z_bytes_map_drop, \
z_owned_buffer_t * : z_buffer_drop, \
zc_owned_payload_t * : zc_payload_drop, \
zc_owned_shmbuf_t * : zc_shmbuf_drop, \
zc_owned_shm_manager_t * : zc_shm_manager_drop, \
Expand Down Expand Up @@ -73,6 +75,7 @@
z_owned_reply_channel_closure_t * : z_reply_channel_closure_null, \
z_owned_reply_channel_t * : z_reply_channel_null, \
z_owned_bytes_map_t * : z_bytes_map_null, \
z_owned_buffer_t * : z_buffer_null, \
z_attachment_t * : z_attachment_null, \
zc_owned_payload_t * : zc_payload_null, \
zc_owned_shmbuf_t * : zc_shmbuf_null, \
Expand All @@ -98,6 +101,7 @@
z_owned_query_t : z_query_check, \
z_owned_str_t : z_str_check, \
z_owned_bytes_map_t : z_bytes_map_check, \
z_owned_buffer_t : z_buffer_check, \
z_attachment_t : z_attachment_check, \
zc_owned_payload_t : zc_payload_check, \
zc_owned_shmbuf_t : zc_shmbuf_check, \
Expand Down Expand Up @@ -141,6 +145,7 @@ template<> struct zenoh_loan_type<z_owned_pull_subscriber_t>{ typedef z_pull_sub
template<> struct zenoh_loan_type<z_owned_encoding_t>{ typedef z_encoding_t type; };
template<> struct zenoh_loan_type<z_owned_hello_t>{ typedef z_hello_t type; };
template<> struct zenoh_loan_type<z_owned_str_t>{ typedef const char* type; };
template<> struct zenoh_loan_type<z_owned_buffer_t>{ typedef z_buffer_t type; };
template<> struct zenoh_loan_type<ze_owned_querying_subscriber_t>{ typedef ze_querying_subscriber_t type; };

template<> inline z_session_t z_loan(const z_owned_session_t& x) { return z_session_loan(&x); }
Expand All @@ -153,6 +158,7 @@ template<> inline z_encoding_t z_loan(const z_owned_encoding_t& x) { return z_en
template<> inline z_hello_t z_loan(const z_owned_hello_t& x) { return z_hello_loan(&x); }
template<> inline z_query_t z_loan(const z_owned_query_t& x) { return z_query_loan(&x); }
template<> inline const char* z_loan(const z_owned_str_t& x) { return z_str_loan(&x); }
template<> inline z_buffer_t z_loan(const z_owned_buffer_t& x) { return z_buffer_loan(&x); }
template<> inline ze_querying_subscriber_t z_loan(const ze_owned_querying_subscriber_t& x) { return ze_querying_subscriber_loan(&x); }

template<class T> struct zenoh_drop_type { typedef T type; };
Expand All @@ -171,6 +177,7 @@ template<> struct zenoh_drop_type<z_owned_reply_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_hello_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_query_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_str_t> { typedef void type; };
template<> struct zenoh_drop_type<z_owned_buffer_t> { typedef void type; };
template<> struct zenoh_drop_type<zc_owned_payload_t> { typedef void type; };
template<> struct zenoh_drop_type<zc_owned_shmbuf_t> { typedef void type; };
template<> struct zenoh_drop_type<zc_owned_shm_manager_t> { typedef void type; };
Expand Down Expand Up @@ -200,6 +207,7 @@ template<> inline void z_drop(z_owned_reply_t* v) { z_reply_drop(v); }
template<> inline void z_drop(z_owned_hello_t* v) { z_hello_drop(v); }
template<> inline void z_drop(z_owned_query_t* v) { z_query_drop(v); }
template<> inline void z_drop(z_owned_str_t* v) { z_str_drop(v); }
template<> inline void z_drop(z_owned_buffer_t* v) { z_buffer_drop(v); }
template<> inline void z_drop(zc_owned_payload_t* v) { zc_payload_drop(v); }
template<> inline void z_drop(zc_owned_shmbuf_t* v) { zc_shmbuf_drop(v); }
template<> inline void z_drop(zc_owned_shm_manager_t* v) { zc_shm_manager_drop(v); }
Expand Down Expand Up @@ -229,6 +237,7 @@ inline void z_null(z_owned_reply_t& v) { v = z_reply_null(); }
inline void z_null(z_owned_hello_t& v) { v = z_hello_null(); }
inline void z_null(z_owned_query_t& v) { v = z_query_null(); }
inline void z_null(z_owned_str_t& v) { v = z_str_null(); }
inline void z_null(z_owned_buffer_t& v) { v = z_buffer_null(); }
inline void z_null(zc_owned_payload_t& v) { v = zc_payload_null(); }
inline void z_null(zc_owned_shmbuf_t& v) { v = zc_shmbuf_null(); }
inline void z_null(zc_owned_shm_manager_t& v) { v = zc_shm_manager_null(); }
Expand Down Expand Up @@ -263,6 +272,7 @@ inline bool z_check(const z_owned_reply_t& v) { return z_reply_check(&v); }
inline bool z_check(const z_owned_hello_t& v) { return z_hello_check(&v); }
inline bool z_check(const z_owned_query_t& v) { return z_query_check(&v); }
inline bool z_check(const z_owned_str_t& v) { return z_str_check(&v); }
inline bool z_check(const z_owned_buffer_t& v) { return z_buffer_check(&v); }
inline bool z_check(const z_owned_bytes_map_t& v) { return z_bytes_map_check(&v); }
inline bool z_check(const z_attachment_t& v) { return z_attachment_check(&v); }
inline bool z_check(const zc_owned_liveliness_token_t& v) { return zc_liveliness_token_check(&v); }
Expand Down
4 changes: 3 additions & 1 deletion src/attachment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ use zenoh::sample::{Attachment, AttachmentBuilder};
///
/// Returning `0` is treated as `continue`.
/// Returning any other value is treated as `break`.
#[allow(non_camel_case_types)]
pub type z_attachment_iter_body_t =
extern "C" fn(key: z_bytes_t, value: z_bytes_t, context: *mut c_void) -> i8;

/// The driver of a loop over an attachment's key-value pairs.
///
/// This function is expected to call `loop_body` once for each key-value pair
/// within `iterator`, passing `context`, and returning any non-zero value immediately (breaking iteration).
#[allow(non_camel_case_types)]
pub type z_attachment_iter_driver_t = Option<
extern "C" fn(
iterator: *const c_void,
Expand Down Expand Up @@ -124,7 +126,7 @@ pub struct z_owned_bytes_map_t {
_1: [usize; 4],
}

impl_guarded_transmute!(
impl_guarded_transmute!(noderefs
Option<HashMap<Cow<'static, [u8]>, Cow<'static, [u8]>>>,
z_owned_bytes_map_t
);
Expand Down
70 changes: 68 additions & 2 deletions src/collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@
// Contributors:
// ZettaScale Zenoh team, <[email protected]>
//

use libc::{c_char, size_t};
use zenoh::prelude::ZenohId;
use zenoh::{
buffers::{buffer::SplitBuffer, ZBuf},
prelude::ZenohId,
};

use crate::impl_guarded_transmute;

/// A contiguous view of bytes owned by some other entity.
///
Expand All @@ -21,8 +27,8 @@ use zenoh::prelude::ZenohId;
#[repr(C)]
#[derive(Clone, Copy, Debug)]
pub struct z_bytes_t {
pub len: size_t,
pub start: *const u8,
pub len: size_t,
}

impl z_bytes_t {
Expand Down Expand Up @@ -155,3 +161,63 @@ impl From<&[u8]> for z_bytes_t {
}
}
}

/// A buffer owned by Zenoh.
#[repr(C)]
pub struct z_owned_buffer_t {
_inner: [usize; 5],
}
impl_guarded_transmute!(Option<ZBuf>, z_owned_buffer_t);

#[no_mangle]
pub extern "C" fn z_buffer_null() -> z_owned_buffer_t {
None.into()
}
#[no_mangle]
pub extern "C" fn z_buffer_drop(buffer: &mut z_owned_buffer_t) {
core::mem::drop(buffer.take())
}

#[no_mangle]
pub extern "C" fn z_buffer_check(buffer: &z_owned_buffer_t) -> bool {
buffer.is_some()
}
#[no_mangle]
pub extern "C" fn z_buffer_loan(buffer: &z_owned_buffer_t) -> z_buffer_t {
buffer.as_ref().into()
}

#[repr(C)]
#[derive(Clone, Copy)]
pub struct z_buffer_t {
_inner: usize,
}
impl_guarded_transmute!(noderefs Option<&ZBuf>, z_buffer_t);
impl From<z_buffer_t> for Option<&'static ZBuf> {
fn from(value: z_buffer_t) -> Self {
unsafe { core::mem::transmute(value) }
}
}
impl From<Option<&ZBuf>> for z_buffer_t {
fn from(value: Option<&ZBuf>) -> Self {
unsafe { core::mem::transmute(value) }
}
}
#[no_mangle]
pub extern "C" fn z_buffer_clone(buffer: z_buffer_t) -> z_owned_buffer_t {
unsafe { Some(core::mem::transmute::<_, &ZBuf>(buffer).clone()).into() }
}

#[no_mangle]
pub extern "C" fn z_buffer_payload(buffer: z_buffer_t) -> z_bytes_t {
let Some(buffer): Option<&ZBuf> = buffer.into() else {
return z_bytes_null();
};
match buffer.contiguous() {
std::borrow::Cow::Borrowed(buffer) => buffer.into(),
std::borrow::Cow::Owned(_) => {
log::error!("A non-contiguous buffer reached user code, this is definitely a bug, please inform us at https://github.com/eclipse-zenoh/zenoh-c/issues/new");
z_bytes_null()
}
}
}
21 changes: 5 additions & 16 deletions src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl From<Option<&Timestamp>> for z_timestamp_t {
#[repr(C)]
pub struct zc_owned_payload_t {
pub payload: z_bytes_t,
pub _owner: [usize; 5],
pub _owner: z_owned_buffer_t,
}
impl Default for zc_owned_payload_t {
fn default() -> Self {
Expand All @@ -130,7 +130,9 @@ impl zc_owned_payload_t {
}
let start = std::mem::replace(&mut self.payload.start, std::ptr::null());
let len = std::mem::replace(&mut self.payload.len, 0);
let mut buf: ZBuf = unsafe { std::mem::transmute(self._owner) };
let Some(mut buf) = self._owner.take() else {
return None;
};
{
let mut slices = buf.zslices_mut();
let slice = slices.next().unwrap();
Expand All @@ -153,7 +155,7 @@ impl zc_owned_payload_t {
if !z_bytes_check(&self.payload) {
return None;
}
unsafe { std::mem::transmute(&self._owner) }
self._owner.as_ref()
}
}
impl Drop for zc_owned_payload_t {
Expand Down Expand Up @@ -198,19 +200,6 @@ pub extern "C" fn zc_payload_null() -> zc_owned_payload_t {
pub struct z_qos_t(u8);

impl_guarded_transmute!(QoS, z_qos_t);
impl_guarded_transmute!(z_qos_t, QoS);

impl From<QoS> for z_qos_t {
fn from(qos: QoS) -> Self {
qos.transmute()
}
}

impl From<z_qos_t> for QoS {
fn from(qos: z_qos_t) -> QoS {
qos.transmute()
}
}

/// Returns message priority.
#[no_mangle]
Expand Down
7 changes: 1 addition & 6 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use libc::{c_char, c_uint};
use std::ffi::CStr;
use zenoh::config::{Config, ValidatedMap, WhatAmI};

use crate::{impl_guarded_transmute, z_owned_str_t, z_str_null, GuardedTransmute};
use crate::{impl_guarded_transmute, z_owned_str_t, z_str_null};

#[no_mangle]
pub static Z_ROUTER: c_uint = WhatAmI::Router as c_uint;
Expand Down Expand Up @@ -76,11 +76,6 @@ pub struct z_config_t(*const z_owned_config_t);
pub struct z_owned_config_t(*mut ());
impl_guarded_transmute!(Option<Box<Config>>, z_owned_config_t);

impl From<Option<Box<Config>>> for z_owned_config_t {
fn from(v: Option<Box<Config>>) -> Self {
v.transmute()
}
}
/// Returns a :c:type:`z_config_t` loaned from `s`.
#[no_mangle]
pub extern "C" fn z_config_loan(s: &z_owned_config_t) -> z_config_t {
Expand Down
2 changes: 1 addition & 1 deletion src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct z_owned_reply_t([u64; 30]);
#[repr(C, align(8))]
pub struct z_owned_reply_t([u64; 19]);

impl_guarded_transmute!(ReplyInner, z_owned_reply_t);
impl_guarded_transmute!(noderefs ReplyInner, z_owned_reply_t);

impl From<ReplyInner> for z_owned_reply_t {
fn from(mut val: ReplyInner) -> Self {
Expand Down
20 changes: 2 additions & 18 deletions src/keyexpr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,27 +62,11 @@ pub struct z_owned_keyexpr_t([u32; 5]);

impl_guarded_transmute!(Option<KeyExpr<'static>>, z_owned_keyexpr_t);

impl From<Option<KeyExpr<'static>>> for z_owned_keyexpr_t {
fn from(val: Option<KeyExpr<'static>>) -> Self {
val.transmute()
}
}
impl From<KeyExpr<'static>> for z_owned_keyexpr_t {
fn from(val: KeyExpr<'static>) -> Self {
Some(val).into()
}
}
impl Deref for z_owned_keyexpr_t {
type Target = Option<KeyExpr<'static>>;
fn deref(&self) -> &Self::Target {
unsafe { std::mem::transmute(self) }
}
}
impl DerefMut for z_owned_keyexpr_t {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { std::mem::transmute(self) }
}
}
impl z_owned_keyexpr_t {
pub fn null() -> Self {
None::<KeyExpr>.into()
Expand Down Expand Up @@ -156,8 +140,8 @@ pub struct z_keyexpr_t([u64; 4]);
#[repr(C, align(4))]
pub struct z_keyexpr_t([u32; 5]);

impl_guarded_transmute!(Option<KeyExpr<'_>>, z_keyexpr_t);
impl_guarded_transmute!(z_keyexpr_t, z_owned_keyexpr_t);
impl_guarded_transmute!(noderefs Option<KeyExpr<'_>>, z_keyexpr_t);
impl_guarded_transmute!(noderefs z_keyexpr_t, z_owned_keyexpr_t);

impl<'a> From<KeyExpr<'a>> for z_keyexpr_t {
fn from(val: KeyExpr<'a>) -> Self {
Expand Down
22 changes: 21 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,26 @@ trait GuardedTransmute<D> {
#[macro_export]
macro_rules! impl_guarded_transmute {
($src_type:ty, $dst_type:ty) => {
impl_guarded_transmute!(noderefs $src_type, $dst_type);
impl From<$src_type> for $dst_type {
fn from(value: $src_type) -> $dst_type {
unsafe { core::mem::transmute(value) }
}
}
impl core::ops::Deref for $dst_type {
type Target = $src_type;
fn deref(&self) -> &$src_type {
unsafe { core::mem::transmute(self) }
}
}
impl core::ops::DerefMut for $dst_type {
fn deref_mut(&mut self) -> &mut $src_type {
unsafe { core::mem::transmute(self) }
}
}

};
(noderefs $src_type:ty, $dst_type:ty) => {
const _: () = {
let src = std::mem::align_of::<$src_type>();
let dst = std::mem::align_of::<$dst_type>();
Expand All @@ -77,7 +97,7 @@ macro_rules! impl_guarded_transmute {
});
}
};
impl $crate::GuardedTransmute<$dst_type> for $src_type {
impl $crate::GuardedTransmute<$dst_type> for $src_type {
fn transmute(self) -> $dst_type {
unsafe { std::mem::transmute::<$src_type, $dst_type>(self) }
}
Expand Down
Loading

0 comments on commit a52ef72

Please sign in to comment.