diff --git a/iceoryx2/src/lib.rs b/iceoryx2/src/lib.rs index b307df87f..c396d6e07 100644 --- a/iceoryx2/src/lib.rs +++ b/iceoryx2/src/lib.rs @@ -290,9 +290,6 @@ pub(crate) mod raw_sample; /// The payload that is received by a [`crate::port::subscriber::Subscriber`]. pub mod sample; -/// The interface for the payload that is sent by a [`crate::port::publisher::Publisher`]. -pub mod payload_mut; - /// The payload that is sent by a [`crate::port::publisher::Publisher`]. pub mod sample_mut; diff --git a/iceoryx2/src/payload_mut.rs b/iceoryx2/src/payload_mut.rs deleted file mode 100644 index 10cc93e6f..000000000 --- a/iceoryx2/src/payload_mut.rs +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright (c) 2024 Contributors to the Eclipse Foundation -// -// See the NOTICE file(s) distributed with this work for additional -// information regarding copyright ownership. -// -// This program and the accompanying materials are made available under the -// terms of the Apache Software License 2.0 which is available at -// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license -// which is available at https://opensource.org/licenses/MIT. -// -// SPDX-License-Identifier: Apache-2.0 OR MIT - -//! # Example -//! -//! ## Sample Write Function -//! -//! ``` -//! use iceoryx2::prelude::*; -//! -//! fn write_sample_data>(mut sample: S) -> impl PayloadMut { -//! sample.write_payload(123456) -//! } -//! -//! # fn main() -> Result<(), Box> { -//! # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); -//! # -//! let service = zero_copy::Service::new(&service_name) -//! .publish_subscribe() -//! .open_or_create::()?; -//! -//! let publisher = service.publisher().create()?; -//! -//! let sample = publisher.loan_uninit()?; -//! let sample = write_sample_data(sample); -//! -//! sample.send()?; -//! -//! # Ok(()) -//! # } -//! ``` -//! See also, [`crate::sample_mut::SampleMut`]. - -use crate::{ - port::update_connections::ConnectionFailure, service::header::publish_subscribe::Header, -}; - -pub(crate) mod internal { - use iceoryx2_cal::zero_copy_connection::PointerOffset; - - pub trait PayloadMgmt { - fn offset_to_chunk(&self) -> PointerOffset; - } -} - -/// Acquired by a [`crate::port::publisher::Publisher`] via -/// [`crate::port::publish::DefaultLoan::loan()`]. It stores the payload that will be sent -/// to all connected [`crate::port::subscriber::Subscriber`]s. If the [`PayloadMut`] is not sent -/// it will release the loaned memory when going out of scope. -pub trait PayloadMut: internal::PayloadMgmt { - /// Returns a reference to the header of the sample. - /// - /// # Example - /// - /// ``` - /// use iceoryx2::prelude::*; - /// - /// # fn main() -> Result<(), Box> { - /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); - /// # - /// # let service = zero_copy::Service::new(&service_name) - /// # .publish_subscribe() - /// # .open_or_create::()?; - /// # let publisher = service.publisher().create()?; - /// - /// let sample = publisher.loan()?; - /// println!("Sample Publisher Origin {:?}", sample.header().publisher_id()); - /// - /// # Ok(()) - /// # } - /// ``` - fn header(&self) -> &Header; - - /// Returns a reference to the payload of the sample. - /// - /// # Notes - /// - /// The generic parameter `MessageType` can be packed into [`core::mem::MaybeUninit`], depending - /// which API is used to obtain the sample. Obtaining a reference is safe for either type. - /// - /// # Example - /// - /// ``` - /// use iceoryx2::prelude::*; - /// - /// # fn main() -> Result<(), Box> { - /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); - /// # - /// # let service = zero_copy::Service::new(&service_name) - /// # .publish_subscribe() - /// # .open_or_create::()?; - /// # let publisher = service.publisher().create()?; - /// - /// let sample = publisher.loan()?; - /// println!("Sample current payload {}", sample.payload()); - /// - /// # Ok(()) - /// # } - /// ``` - fn payload(&self) -> &MessageType; - - /// Returns a mutable reference to the payload of the sample. - /// - /// # Notes - /// - /// The generic parameter `MessageType` can be packed into [`core::mem::MaybeUninit`], depending - /// which API is used to obtain the sample. Obtaining a reference is safe for either type. - /// - /// # Example - /// - /// ``` - /// use iceoryx2::prelude::*; - /// - /// # fn main() -> Result<(), Box> { - /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); - /// # - /// # let service = zero_copy::Service::new(&service_name) - /// # .publish_subscribe() - /// # .open_or_create::()?; - /// # let publisher = service.publisher().create()?; - /// - /// let mut sample = publisher.loan()?; - /// *sample.payload_mut() = 4567; - /// - /// # Ok(()) - /// # } - /// ``` - fn payload_mut(&mut self) -> &mut MessageType; - - /// Send a previously loaned [`crate::port::publish::UninitLoan::loan_uninit()`] or - /// [`crate::port::publish::DefaultLoan::loan()`] [`PayloadMut`] to all connected - /// [`crate::port::subscriber::Subscriber`]s of the service. - /// - /// The payload of the [`PayloadMut`] must be initialized before it can be sent. Have a look - /// at [`UninitPayloadMut::write_payload()`] and [`UninitPayloadMut::assume_init()`] - /// for more details. - /// - /// On success the number of [`crate::port::subscriber::Subscriber`]s that received - /// the data is returned, otherwise a [`ConnectionFailure`] describing the failure. - fn send(self) -> Result; -} - -/// Acquired by a [`crate::port::publisher::Publisher`] via -/// [`crate::port::publish::UninitLoan::loan_uninit()`]. It stores the payload that will be sent -/// to all connected [`crate::port::subscriber::Subscriber`]s. If the [`PayloadMut`] is not sent -/// it will release the loaned memory when going out of scope. -/// -/// The generic parameter `MessageType` is packed into [`core::mem::MaybeUninit`] -pub trait UninitPayloadMut { - type InitializedSample: PayloadMut; - - /// Writes the payload to the sample and labels the sample as initialized - /// - /// # Example - /// - /// ``` - /// use iceoryx2::prelude::*; - /// # fn main() -> Result<(), Box> { - /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); - /// # - /// # let service = zero_copy::Service::new(&service_name) - /// # .publish_subscribe() - /// # .open_or_create::()?; - /// # - /// # let publisher = service.publisher().create()?; - /// - /// let sample = publisher.loan_uninit()?; - /// let sample = sample.write_payload(1234); - /// - /// sample.send()?; - /// - /// # Ok(()) - /// # } - /// ``` - fn write_payload(self, value: MessageType) -> Self::InitializedSample; - - /// Extracts the value of the [`core::mem::MaybeUninit`] container and labels the sample as initialized - /// - /// # Safety - /// - /// The caller must ensure that [`core::mem::MaybeUninit`] really is initialized. Calling this when - /// the content is not fully initialized causes immediate undefined behavior. - /// - /// # Example - /// - /// ``` - /// use iceoryx2::prelude::*; - /// # fn main() -> Result<(), Box> { - /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); - /// # - /// # let service = zero_copy::Service::new(&service_name) - /// # .publish_subscribe() - /// # .open_or_create::()?; - /// # - /// # let publisher = service.publisher().create()?; - /// - /// let mut sample = publisher.loan_uninit()?; - /// sample.payload_mut().write(1234); - /// let sample = unsafe { sample.assume_init() }; - /// - /// sample.send()?; - /// - /// # Ok(()) - /// # } - /// ``` - unsafe fn assume_init(self) -> Self::InitializedSample; -} diff --git a/iceoryx2/src/port/mod.rs b/iceoryx2/src/port/mod.rs index 875ecc1e2..39ecad4b1 100644 --- a/iceoryx2/src/port/mod.rs +++ b/iceoryx2/src/port/mod.rs @@ -38,11 +38,6 @@ use crate::service; /// Defines the action a port shall take when an internal failure occurs. Can happen when the /// system is corrupted and files are modified by non-iceoryx2 instances. Is used as return value of /// the [`DegrationCallback`] to define a custom behavior. -/// -/// Can be set with: -/// -/// * [`publisher::Publisher::set_degration_callback()`] -/// * [`subscriber::Subscriber::set_degration_callback()`] #[derive(Debug, PartialEq, Eq, Copy, Clone)] pub enum DegrationAction { /// Ignore the degration completely diff --git a/iceoryx2/src/port/publisher.rs b/iceoryx2/src/port/publisher.rs index 715984277..c6755fc7b 100644 --- a/iceoryx2/src/port/publisher.rs +++ b/iceoryx2/src/port/publisher.rs @@ -58,12 +58,11 @@ use std::cell::UnsafeCell; use std::fmt::Debug; use std::rc::Rc; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::{alloc::Layout, marker::PhantomData, mem::MaybeUninit}; use super::port_identifiers::{UniquePublisherId, UniqueSubscriberId}; use crate::message::Message; -use crate::payload_mut::{internal::PayloadMgmt, PayloadMut, UninitPayloadMut}; use crate::port::details::subscriber_connections::*; use crate::port::update_connections::{ConnectionFailure, UpdateConnections}; use crate::port::DegrationAction; @@ -91,7 +90,7 @@ use iceoryx2_cal::zero_copy_connection::{ ZeroCopyConnection, ZeroCopyCreationError, ZeroCopySendError, ZeroCopySender, }; -/// Defines a failure that can occur when a [`Publish`] is created with +/// Defines a failure that can occur when a [`Publisher`] is created with /// [`crate::service::port_factory::publisher::PortFactoryPublisher`]. #[derive(Debug, PartialEq, Eq, Copy, Clone)] pub enum PublisherCreateError { @@ -107,8 +106,8 @@ impl std::fmt::Display for PublisherCreateError { impl std::error::Error for PublisherCreateError {} -/// Defines a failure that can occur in [`DefaultLoan::loan()`] and [`UninitLoan::loan_uninit()`] -/// or is part of [`PublisherSendError`] emitted in [`SendCopy::send_copy()`]. +/// Defines a failure that can occur in [`Publisher::loan()`] and [`Publisher::loan_uninit()`] +/// or is part of [`PublisherSendError`] emitted in [`Publisher::send_copy()`]. #[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)] pub enum PublisherLoanError { OutOfMemory, @@ -127,6 +126,8 @@ impl std::error::Error for PublisherLoanError {} enum_gen! { /// Failure that can be emitted when a [`crate::sample::Sample`] is sent via [`Publisher::send()`]. PublisherSendError + entry: + ConnectionBrokenSincePublisherNoLongerExists mapping: PublisherLoanError to LoanError, ConnectionFailure to ConnectionError @@ -155,6 +156,7 @@ pub(crate) struct DataSegment { history: Option>>, static_config: crate::service::static_config::StaticConfig, loan_counter: AtomicUsize, + is_active: AtomicBool, } impl DataSegment { @@ -360,9 +362,15 @@ impl DataSegment { } } - pub(crate) fn send_sample(&self, address_to_chunk: usize) -> Result { + pub(crate) fn send_sample(&self, address_to_chunk: usize) -> Result { + let msg = "Unable to send sample"; + if !self.is_active.load(Ordering::Relaxed) { + fail!(from self, with PublisherSendError::ConnectionBrokenSincePublisherNoLongerExists, + "{} since the connections could not be updated.", msg); + } + fail!(from self, when self.update_connections(), - "Unable to send sample since the connections could not be updated."); + "{} since the connections could not be updated.", msg); self.add_sample_to_history(address_to_chunk); Ok(self.deliver_sample(address_to_chunk)) @@ -433,6 +441,7 @@ impl Publisher>(), message_type_layout: Layout::new::(), @@ -520,12 +529,12 @@ impl Publisher Publisher Publisher { - /// Loans/allocates a [`crate::sample_mut::SampleMut`] from the underlying data segment of the [`Publish`] - /// and initialize it with the default value. This can be a performance hit and [`UninitLoan::loan_uninit`] + /// Loans/allocates a [`crate::sample_mut::SampleMut`] from the underlying data segment of the [`Publisher`] + /// and initialize it with the default value. This can be a performance hit and [`Publisher::loan_uninit`] /// can be used to loan a [`core::mem::MaybeUninit`]. /// /// On failure it returns [`PublisherLoanError`] describing the failure. diff --git a/iceoryx2/src/port/update_connections.rs b/iceoryx2/src/port/update_connections.rs index 6353fc38e..587dea0af 100644 --- a/iceoryx2/src/port/update_connections.rs +++ b/iceoryx2/src/port/update_connections.rs @@ -33,8 +33,8 @@ impl std::error::Error for ConnectionFailure {} pub trait UpdateConnections { /// Explicitly updates all connections to the [`crate::port::subscriber::Subscriber`]s. This is /// required to be called whenever a new [`crate::port::subscriber::Subscriber`] connected to - /// the service. It is done implicitly whenever [`crate::payload_mut::PayloadMut::send()`] or - /// [`crate::port::publish::SendCopy::send_copy()`] is called. + /// the service. It is done implicitly whenever [`crate::sample_mut::SampleMut::send()`] or + /// [`crate::port::publisher::Publisher::send_copy()`] is called. /// When a [`crate::port::subscriber::Subscriber`] is connected that requires a history this /// call will deliver it. /// diff --git a/iceoryx2/src/prelude.rs b/iceoryx2/src/prelude.rs index 9e23b4e88..0985f669b 100644 --- a/iceoryx2/src/prelude.rs +++ b/iceoryx2/src/prelude.rs @@ -12,6 +12,5 @@ pub use crate::iox2::Iox2; pub use crate::iox2::Iox2Event; -pub use crate::payload_mut::{PayloadMut, UninitPayloadMut}; pub use crate::port::event_id::EventId; pub use crate::service::{process_local, service_name::ServiceName, zero_copy, Service}; diff --git a/iceoryx2/src/sample_mut.rs b/iceoryx2/src/sample_mut.rs index ecb8a5fd1..d72b81361 100644 --- a/iceoryx2/src/sample_mut.rs +++ b/iceoryx2/src/sample_mut.rs @@ -37,8 +37,7 @@ //! See also, [`crate::sample_mut::SampleMut`]. use crate::{ - payload_mut::{internal::PayloadMgmt, PayloadMut, UninitPayloadMut}, - port::{publisher::DataSegment, update_connections::ConnectionFailure}, + port::publisher::{DataSegment, PublisherSendError}, raw_sample::RawSampleMut, service::header::publish_subscribe::Header, }; @@ -46,8 +45,8 @@ use iceoryx2_cal::shared_memory::*; use std::{fmt::Debug, mem::MaybeUninit, rc::Rc}; /// Acquired by a [`crate::port::publisher::Publisher`] via -/// [`crate::port::publish::DefaultLoan::loan()`] or -/// [`crate::port::publish::UninitLoan::loan_uninit()`]. It stores the payload that will be sent +/// [`crate::port::publisher::Publisher::loan()`] or +/// [`crate::port::publisher::Publisher::loan_uninit()`]. It stores the payload that will be sent /// to all connected [`crate::port::subscriber::Subscriber`]s. If the [`SampleMut`] is not sent /// it will release the loaned memory when going out of scope. /// @@ -62,7 +61,7 @@ use std::{fmt::Debug, mem::MaybeUninit, rc::Rc}; pub struct SampleMut { data_segment: Rc>, ptr: RawSampleMut, - offset_to_chunk: PointerOffset, + pub(crate) offset_to_chunk: PointerOffset, } impl Drop @@ -89,26 +88,68 @@ impl } } -impl PayloadMgmt - for SampleMut -{ - fn offset_to_chunk(&self) -> PointerOffset { - self.offset_to_chunk - } -} - -impl UninitPayloadMut - for SampleMut, Service> +impl + SampleMut, Service> { - type InitializedSample = SampleMut; - - fn write_payload(mut self, value: MessageType) -> SampleMut { + /// Writes the payload to the sample and labels the sample as initialized + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// # fn main() -> Result<(), Box> { + /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); + /// # + /// # let service = zero_copy::Service::new(&service_name) + /// # .publish_subscribe() + /// # .open_or_create::()?; + /// # + /// # let publisher = service.publisher().create()?; + /// + /// let sample = publisher.loan_uninit()?; + /// let sample = sample.write_payload(1234); + /// + /// sample.send()?; + /// + /// # Ok(()) + /// # } + /// ``` + pub fn write_payload(mut self, value: MessageType) -> SampleMut { self.payload_mut().write(value); // SAFETY: this is safe since the payload was initialized on the line above unsafe { self.assume_init() } } - unsafe fn assume_init(self) -> SampleMut { + /// Extracts the value of the [`core::mem::MaybeUninit`] container and labels the sample as initialized + /// + /// # Safety + /// + /// The caller must ensure that [`core::mem::MaybeUninit`] really is initialized. Calling this when + /// the content is not fully initialized causes immediate undefined behavior. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// # fn main() -> Result<(), Box> { + /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); + /// # + /// # let service = zero_copy::Service::new(&service_name) + /// # .publish_subscribe() + /// # .open_or_create::()?; + /// # + /// # let publisher = service.publisher().create()?; + /// + /// let mut sample = publisher.loan_uninit()?; + /// sample.payload_mut().write(1234); + /// let sample = unsafe { sample.assume_init() }; + /// + /// sample.send()?; + /// + /// # Ok(()) + /// # } + /// ``` + pub unsafe fn assume_init(self) -> SampleMut { // the transmute is not nice but safe since MaybeUninit is #[repr(transparent)] to the inner type std::mem::transmute(self) } @@ -117,21 +158,104 @@ impl UninitPayloadMut` Service: crate::service::Service, - > PayloadMut for SampleMut + > SampleMut { - fn header(&self) -> &Header { + /// Returns a reference to the header of the sample. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// + /// # fn main() -> Result<(), Box> { + /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); + /// # + /// # let service = zero_copy::Service::new(&service_name) + /// # .publish_subscribe() + /// # .open_or_create::()?; + /// # let publisher = service.publisher().create()?; + /// + /// let sample = publisher.loan()?; + /// println!("Sample Publisher Origin {:?}", sample.header().publisher_id()); + /// + /// # Ok(()) + /// # } + /// ``` + pub fn header(&self) -> &Header { self.ptr.as_header_ref() } - fn payload(&self) -> &M { + /// Returns a reference to the payload of the sample. + /// + /// # Notes + /// + /// The generic parameter `MessageType` can be packed into [`core::mem::MaybeUninit`], depending + /// which API is used to obtain the sample. Obtaining a reference is safe for either type. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// + /// # fn main() -> Result<(), Box> { + /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); + /// # + /// # let service = zero_copy::Service::new(&service_name) + /// # .publish_subscribe() + /// # .open_or_create::()?; + /// # let publisher = service.publisher().create()?; + /// + /// let sample = publisher.loan()?; + /// println!("Sample current payload {}", sample.payload()); + /// + /// # Ok(()) + /// # } + /// ``` + pub fn payload(&self) -> &M { self.ptr.as_data_ref() } - fn payload_mut(&mut self) -> &mut M { + /// Returns a mutable reference to the payload of the sample. + /// + /// # Notes + /// + /// The generic parameter `MessageType` can be packed into [`core::mem::MaybeUninit`], depending + /// which API is used to obtain the sample. Obtaining a reference is safe for either type. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// + /// # fn main() -> Result<(), Box> { + /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); + /// # + /// # let service = zero_copy::Service::new(&service_name) + /// # .publish_subscribe() + /// # .open_or_create::()?; + /// # let publisher = service.publisher().create()?; + /// + /// let mut sample = publisher.loan()?; + /// *sample.payload_mut() = 4567; + /// + /// # Ok(()) + /// # } + /// ``` + pub fn payload_mut(&mut self) -> &mut M { self.ptr.as_data_mut() } - fn send(self) -> Result { + /// Send a previously loaned [`crate::port::publisher::Publisher::loan_uninit()`] or + /// [`crate::port::publisher::Publisher::loan()`] [`SampleMut`] to all connected + /// [`crate::port::subscriber::Subscriber`]s of the service. + /// + /// The payload of the [`SampleMut`] must be initialized before it can be sent. Have a look + /// at [`SampleMut::write_payload()`] and [`SampleMut::assume_init()`] + /// for more details. + /// + /// On success the number of [`crate::port::subscriber::Subscriber`]s that received + /// the data is returned, otherwise a [`PublisherSendError`] describing the failure. + pub fn send(self) -> Result { self.data_segment.send_sample(self.offset_to_chunk.value()) } } diff --git a/iceoryx2/src/service/port_factory/publisher.rs b/iceoryx2/src/service/port_factory/publisher.rs index 7ac4cbf93..32494ef2e 100644 --- a/iceoryx2/src/service/port_factory/publisher.rs +++ b/iceoryx2/src/service/port_factory/publisher.rs @@ -47,8 +47,8 @@ use crate::{ }; /// Defines the strategy the [`Publisher`] shall pursue in -/// [`crate::payload_mut::PayloadMut::send()`] or -/// [`crate::port::publish::SendCopy::send_copy()`] when the buffer of a +/// [`crate::sample_mut::SampleMut::send()`] or +/// [`Publisher::send_copy()`] when the buffer of a /// [`crate::port::subscriber::Subscriber`] is full and the service does not overflow. #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub enum UnableToDeliverStrategy { @@ -144,8 +144,8 @@ impl<'factory, Service: service::Service, MessageType: Debug> } /// Defines how many [`crate::sample_mut::SampleMut`] the [`Publisher`] can loan with - /// [`crate::port::publish::DefaultLoan::loan()`] or - /// [`crate::port::publish::UninitLoan::loan_uninit()`] in parallel. + /// [`Publisher::loan()`] or + /// [`Publisher::loan_uninit()`] in parallel. pub fn max_loaned_samples(mut self, value: usize) -> Self { self.config.max_loaned_samples = value; self diff --git a/iceoryx2/tests/publisher_tests.rs b/iceoryx2/tests/publisher_tests.rs index ad1ed1c8e..2e095515b 100644 --- a/iceoryx2/tests/publisher_tests.rs +++ b/iceoryx2/tests/publisher_tests.rs @@ -14,9 +14,7 @@ mod publisher { use std::time::{Duration, Instant}; - use iceoryx2::payload_mut::UninitPayloadMut; use iceoryx2::port::publisher::PublisherLoanError; - use iceoryx2::prelude::*; use iceoryx2::service::port_factory::publisher::UnableToDeliverStrategy; use iceoryx2::service::{service_name::ServiceName, Service}; use iceoryx2_bb_posix::barrier::{BarrierBuilder, BarrierHandle};