diff --git a/iceoryx2/src/lib.rs b/iceoryx2/src/lib.rs index 56d8d9a84..c396d6e07 100644 --- a/iceoryx2/src/lib.rs +++ b/iceoryx2/src/lib.rs @@ -287,15 +287,9 @@ pub mod port; pub(crate) mod raw_sample; -/// The interface for the payload that is received by a [`crate::port::subscriber::Subscriber`]. -pub mod payload; - /// The payload that is received by a [`crate::port::subscriber::Subscriber`]. pub mod sample; -/// The interface for the payload that is sent by a [`crate::port::publisher::Publisher`]. -pub mod payload_mut; - /// The payload that is sent by a [`crate::port::publisher::Publisher`]. pub mod sample_mut; diff --git a/iceoryx2/src/payload.rs b/iceoryx2/src/payload.rs deleted file mode 100644 index 5c88ca34a..000000000 --- a/iceoryx2/src/payload.rs +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright (c) 2024 Contributors to the Eclipse Foundation -// -// See the NOTICE file(s) distributed with this work for additional -// information regarding copyright ownership. -// -// This program and the accompanying materials are made available under the -// terms of the Apache Software License 2.0 which is available at -// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license -// which is available at https://opensource.org/licenses/MIT. -// -// SPDX-License-Identifier: Apache-2.0 OR MIT - -//! # Example -//! -//! ``` -//! use iceoryx2::prelude::*; -//! # fn main() -> Result<(), Box> { -//! # let service_name = ServiceName::new("My/Funk/ServiceName")?; -//! # let service = zero_copy::Service::new(&service_name) -//! # .publish_subscribe() -//! # .open_or_create::()?; -//! # let subscriber = service.subscriber().create()?; -//! -//! let mut received_samples: Vec>> = vec![]; -//! -//! while let Some(sample) = subscriber.receive()? { -//! println!("received: {:?}", *sample); -//! println!("header timestamp {:?}, publisher id {:?}", -//! sample.header().time_stamp(), sample.header().publisher_id()); -//! received_samples.push(Box::new(sample)); -//! } -//! -//! # Ok(()) -//! # } -//! ``` -//! -//! See also [`crate::sample::Sample`]. - -use crate::service::header::publish_subscribe::Header; - -/// It stores the payload and is acquired by the [`crate::port::subscriber::Subscriber`] whenever -/// it receives new data from a [`crate::port::publisher::Publisher`] via -/// [`crate::port::subscribe::Subscribe::receive()`]. -pub trait Payload { - /// Returns a reference to the payload of the sample - fn payload(&self) -> &MessageType; - - /// Returns a reference to the header of the sample. - fn header(&self) -> &Header; -} diff --git a/iceoryx2/src/payload_mut.rs b/iceoryx2/src/payload_mut.rs deleted file mode 100644 index 10cc93e6f..000000000 --- a/iceoryx2/src/payload_mut.rs +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright (c) 2024 Contributors to the Eclipse Foundation -// -// See the NOTICE file(s) distributed with this work for additional -// information regarding copyright ownership. -// -// This program and the accompanying materials are made available under the -// terms of the Apache Software License 2.0 which is available at -// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license -// which is available at https://opensource.org/licenses/MIT. -// -// SPDX-License-Identifier: Apache-2.0 OR MIT - -//! # Example -//! -//! ## Sample Write Function -//! -//! ``` -//! use iceoryx2::prelude::*; -//! -//! fn write_sample_data>(mut sample: S) -> impl PayloadMut { -//! sample.write_payload(123456) -//! } -//! -//! # fn main() -> Result<(), Box> { -//! # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); -//! # -//! let service = zero_copy::Service::new(&service_name) -//! .publish_subscribe() -//! .open_or_create::()?; -//! -//! let publisher = service.publisher().create()?; -//! -//! let sample = publisher.loan_uninit()?; -//! let sample = write_sample_data(sample); -//! -//! sample.send()?; -//! -//! # Ok(()) -//! # } -//! ``` -//! See also, [`crate::sample_mut::SampleMut`]. - -use crate::{ - port::update_connections::ConnectionFailure, service::header::publish_subscribe::Header, -}; - -pub(crate) mod internal { - use iceoryx2_cal::zero_copy_connection::PointerOffset; - - pub trait PayloadMgmt { - fn offset_to_chunk(&self) -> PointerOffset; - } -} - -/// Acquired by a [`crate::port::publisher::Publisher`] via -/// [`crate::port::publish::DefaultLoan::loan()`]. It stores the payload that will be sent -/// to all connected [`crate::port::subscriber::Subscriber`]s. If the [`PayloadMut`] is not sent -/// it will release the loaned memory when going out of scope. -pub trait PayloadMut: internal::PayloadMgmt { - /// Returns a reference to the header of the sample. - /// - /// # Example - /// - /// ``` - /// use iceoryx2::prelude::*; - /// - /// # fn main() -> Result<(), Box> { - /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); - /// # - /// # let service = zero_copy::Service::new(&service_name) - /// # .publish_subscribe() - /// # .open_or_create::()?; - /// # let publisher = service.publisher().create()?; - /// - /// let sample = publisher.loan()?; - /// println!("Sample Publisher Origin {:?}", sample.header().publisher_id()); - /// - /// # Ok(()) - /// # } - /// ``` - fn header(&self) -> &Header; - - /// Returns a reference to the payload of the sample. - /// - /// # Notes - /// - /// The generic parameter `MessageType` can be packed into [`core::mem::MaybeUninit`], depending - /// which API is used to obtain the sample. Obtaining a reference is safe for either type. - /// - /// # Example - /// - /// ``` - /// use iceoryx2::prelude::*; - /// - /// # fn main() -> Result<(), Box> { - /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); - /// # - /// # let service = zero_copy::Service::new(&service_name) - /// # .publish_subscribe() - /// # .open_or_create::()?; - /// # let publisher = service.publisher().create()?; - /// - /// let sample = publisher.loan()?; - /// println!("Sample current payload {}", sample.payload()); - /// - /// # Ok(()) - /// # } - /// ``` - fn payload(&self) -> &MessageType; - - /// Returns a mutable reference to the payload of the sample. - /// - /// # Notes - /// - /// The generic parameter `MessageType` can be packed into [`core::mem::MaybeUninit`], depending - /// which API is used to obtain the sample. Obtaining a reference is safe for either type. - /// - /// # Example - /// - /// ``` - /// use iceoryx2::prelude::*; - /// - /// # fn main() -> Result<(), Box> { - /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); - /// # - /// # let service = zero_copy::Service::new(&service_name) - /// # .publish_subscribe() - /// # .open_or_create::()?; - /// # let publisher = service.publisher().create()?; - /// - /// let mut sample = publisher.loan()?; - /// *sample.payload_mut() = 4567; - /// - /// # Ok(()) - /// # } - /// ``` - fn payload_mut(&mut self) -> &mut MessageType; - - /// Send a previously loaned [`crate::port::publish::UninitLoan::loan_uninit()`] or - /// [`crate::port::publish::DefaultLoan::loan()`] [`PayloadMut`] to all connected - /// [`crate::port::subscriber::Subscriber`]s of the service. - /// - /// The payload of the [`PayloadMut`] must be initialized before it can be sent. Have a look - /// at [`UninitPayloadMut::write_payload()`] and [`UninitPayloadMut::assume_init()`] - /// for more details. - /// - /// On success the number of [`crate::port::subscriber::Subscriber`]s that received - /// the data is returned, otherwise a [`ConnectionFailure`] describing the failure. - fn send(self) -> Result; -} - -/// Acquired by a [`crate::port::publisher::Publisher`] via -/// [`crate::port::publish::UninitLoan::loan_uninit()`]. It stores the payload that will be sent -/// to all connected [`crate::port::subscriber::Subscriber`]s. If the [`PayloadMut`] is not sent -/// it will release the loaned memory when going out of scope. -/// -/// The generic parameter `MessageType` is packed into [`core::mem::MaybeUninit`] -pub trait UninitPayloadMut { - type InitializedSample: PayloadMut; - - /// Writes the payload to the sample and labels the sample as initialized - /// - /// # Example - /// - /// ``` - /// use iceoryx2::prelude::*; - /// # fn main() -> Result<(), Box> { - /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); - /// # - /// # let service = zero_copy::Service::new(&service_name) - /// # .publish_subscribe() - /// # .open_or_create::()?; - /// # - /// # let publisher = service.publisher().create()?; - /// - /// let sample = publisher.loan_uninit()?; - /// let sample = sample.write_payload(1234); - /// - /// sample.send()?; - /// - /// # Ok(()) - /// # } - /// ``` - fn write_payload(self, value: MessageType) -> Self::InitializedSample; - - /// Extracts the value of the [`core::mem::MaybeUninit`] container and labels the sample as initialized - /// - /// # Safety - /// - /// The caller must ensure that [`core::mem::MaybeUninit`] really is initialized. Calling this when - /// the content is not fully initialized causes immediate undefined behavior. - /// - /// # Example - /// - /// ``` - /// use iceoryx2::prelude::*; - /// # fn main() -> Result<(), Box> { - /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); - /// # - /// # let service = zero_copy::Service::new(&service_name) - /// # .publish_subscribe() - /// # .open_or_create::()?; - /// # - /// # let publisher = service.publisher().create()?; - /// - /// let mut sample = publisher.loan_uninit()?; - /// sample.payload_mut().write(1234); - /// let sample = unsafe { sample.assume_init() }; - /// - /// sample.send()?; - /// - /// # Ok(()) - /// # } - /// ``` - unsafe fn assume_init(self) -> Self::InitializedSample; -} diff --git a/iceoryx2/src/port/listen.rs b/iceoryx2/src/port/listen.rs deleted file mode 100644 index 168680706..000000000 --- a/iceoryx2/src/port/listen.rs +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright (c) 2023 - 2024 Contributors to the Eclipse Foundation -// -// See the NOTICE file(s) distributed with this work for additional -// information regarding copyright ownership. -// -// This program and the accompanying materials are made available under the -// terms of the Apache Software License 2.0 which is available at -// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license -// which is available at https://opensource.org/licenses/MIT. -// -// SPDX-License-Identifier: Apache-2.0 OR MIT - -//! # Example -//! -//! ``` -//! use iceoryx2::prelude::*; -//! use std::boxed::Box; -//! # fn main() -> Result<(), Box> { -//! let event_name = ServiceName::new("MyEventName")?; -//! let event_ipc = zero_copy::Service::new(&event_name) -//! .event() -//! .open_or_create()?; -//! -//! let event_local = process_local::Service::new(&event_name) -//! .event() -//! .open_or_create()?; -//! -//! let mut listeners: Vec> = vec![]; -//! -//! listeners.push(Box::new(event_ipc.listener().create()?)); -//! listeners.push(Box::new(event_local.listener().create()?)); -//! -//! for listener in &mut listeners { -//! for event_id in listener.try_wait()? { -//! println!("event was triggered with id: {:?}", event_id); -//! } -//! } -//! -//! # Ok(()) -//! # } -//! ``` -//! -//! See also [`crate::port::listener::Listener`] - -use std::time::Duration; - -use iceoryx2_cal::event::ListenerWaitError; - -use super::event_id::EventId; - -/// Defines the failures that can occur when a [`Listen`]er is created with the -/// [`crate::service::port_factory::listener::PortFactoryListener`]. -#[derive(Debug, PartialEq, Eq, Copy, Clone)] -pub enum ListenerCreateError { - ExceedsMaxSupportedListeners, - ResourceCreationFailed, -} - -impl std::fmt::Display for ListenerCreateError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::write!(f, "{}::{:?}", std::stringify!(Self), self) - } -} - -impl std::error::Error for ListenerCreateError {} - -/// The interface of the receiving endpoint of an event based communication. -pub trait Listen { - /// Returns the cached [`EventId`]s. Whenever [`Listen::try_wait()`], - /// [`Listen::timed_wait()`] or [`Listen::blocking_wait()`] is called the cache is reset - /// and filled with the events that where signaled since the last call. This cache can be - /// accessed until a new wait call resets and fills it again. - fn cache(&self) -> &[EventId]; - - /// Non-blocking wait for new [`EventId`]s. If no [`EventId`]s were notified the returned slice - /// is empty. On error it returns [`ListenerWaitError`] is returned which describes the error - /// in detail. - fn try_wait(&mut self) -> Result<&[EventId], ListenerWaitError>; - - /// Blocking wait for new [`EventId`]s until either an [`EventId`] was received or the timeout - /// has passed. If no [`EventId`]s were notified the returned slice - /// is empty. On error it returns [`ListenerWaitError`] is returned which describes the error - /// in detail. - fn timed_wait(&mut self, timeout: Duration) -> Result<&[EventId], ListenerWaitError>; - - /// Blocking wait for new [`EventId`]s until either an [`EventId`]. - /// Sporadic wakeups can occur and if no [`EventId`]s were notified the returned slice - /// is empty. On error it returns [`ListenerWaitError`] is returned which describes the error - /// in detail. - fn blocking_wait(&mut self) -> Result<&[EventId], ListenerWaitError>; -} diff --git a/iceoryx2/src/port/listener.rs b/iceoryx2/src/port/listener.rs index 914adaf00..f8ad88d0f 100644 --- a/iceoryx2/src/port/listener.rs +++ b/iceoryx2/src/port/listener.rs @@ -44,7 +44,22 @@ use std::rc::Rc; use std::time::Duration; use super::event_id::EventId; -use super::listen::{Listen, ListenerCreateError}; + +/// Defines the failures that can occur when a [`Listener`] is created with the +/// [`crate::service::port_factory::listener::PortFactoryListener`]. +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum ListenerCreateError { + ExceedsMaxSupportedListeners, + ResourceCreationFailed, +} + +impl std::fmt::Display for ListenerCreateError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::write!(f, "{}::{:?}", std::stringify!(Self), self) + } +} + +impl std::error::Error for ListenerCreateError {} /// Represents the receiving endpoint of an event based communication. #[derive(Debug)] @@ -116,21 +131,30 @@ impl Listener { Ok(()) } -} -impl Listen for Listener { - fn cache(&self) -> &[EventId] { + /// Returns the cached [`EventId`]s. Whenever [`Listener::try_wait()`], + /// [`Listener::timed_wait()`] or [`Listener::blocking_wait()`] is called the cache is reset + /// and filled with the events that where signaled since the last call. This cache can be + /// accessed until a new wait call resets and fills it again. + pub fn cache(&self) -> &[EventId] { &self.cache } - fn try_wait(&mut self) -> Result<&[EventId], ListenerWaitError> { + /// Non-blocking wait for new [`EventId`]s. If no [`EventId`]s were notified the returned slice + /// is empty. On error it returns [`ListenerWaitError`] is returned which describes the error + /// in detail. + pub fn try_wait(&mut self) -> Result<&[EventId], ListenerWaitError> { self.cache.clear(); self.fill_cache()?; Ok(self.cache()) } - fn timed_wait(&mut self, timeout: Duration) -> Result<&[EventId], ListenerWaitError> { + /// Blocking wait for new [`EventId`]s until either an [`EventId`] was received or the timeout + /// has passed. If no [`EventId`]s were notified the returned slice + /// is empty. On error it returns [`ListenerWaitError`] is returned which describes the error + /// in detail. + pub fn timed_wait(&mut self, timeout: Duration) -> Result<&[EventId], ListenerWaitError> { use iceoryx2_cal::event::Listener; self.cache.clear(); @@ -145,7 +169,11 @@ impl Listen for Listener { Ok(self.cache()) } - fn blocking_wait(&mut self) -> Result<&[EventId], ListenerWaitError> { + /// Blocking wait for new [`EventId`]s until either an [`EventId`]. + /// Sporadic wakeups can occur and if no [`EventId`]s were notified the returned slice + /// is empty. On error it returns [`ListenerWaitError`] is returned which describes the error + /// in detail. + pub fn blocking_wait(&mut self) -> Result<&[EventId], ListenerWaitError> { use iceoryx2_cal::event::Listener; self.cache.clear(); diff --git a/iceoryx2/src/port/mod.rs b/iceoryx2/src/port/mod.rs index 42903f37e..39ecad4b1 100644 --- a/iceoryx2/src/port/mod.rs +++ b/iceoryx2/src/port/mod.rs @@ -18,22 +18,14 @@ pub(crate) mod details; /// Defines the event id used to identify the source of an event. pub mod event_id; -/// The interface of the receiving endpoint for event based communication -pub mod listen; /// Receiving endpoint (port) for event based communication pub mod listener; /// Sending endpoint (port) for event based communication pub mod notifier; -/// The interface of the sending endpoint for event based communication -pub mod notify; /// Defines port specific unique ids. Used to identify source/destination while communicating. pub mod port_identifiers; -/// The interface of the sending endpoint for publish-subscribe based communication -pub mod publish; /// Sending endpoint (port) for publish-subscribe based communication pub mod publisher; -/// The interface of the receiving endpoint for publish-subscribe based communication -pub mod subscribe; /// Receiving endpoint (port) for publish-subscribe based communication pub mod subscriber; /// Interface to perform cyclic updates to the ports. Required to deliver history to new @@ -46,11 +38,6 @@ use crate::service; /// Defines the action a port shall take when an internal failure occurs. Can happen when the /// system is corrupted and files are modified by non-iceoryx2 instances. Is used as return value of /// the [`DegrationCallback`] to define a custom behavior. -/// -/// Can be set with: -/// -/// * [`publisher::Publisher::set_degration_callback()`] -/// * [`subscriber::Subscriber::set_degration_callback()`] #[derive(Debug, PartialEq, Eq, Copy, Clone)] pub enum DegrationAction { /// Ignore the degration completely diff --git a/iceoryx2/src/port/notifier.rs b/iceoryx2/src/port/notifier.rs index 2873d952e..363329cf7 100644 --- a/iceoryx2/src/port/notifier.rs +++ b/iceoryx2/src/port/notifier.rs @@ -37,6 +37,7 @@ //! //! See also [`crate::port::notifier::Notifier`] +use super::{event_id::EventId, port_identifiers::UniqueListenerId}; use crate::{ port::port_identifiers::UniqueNotifierId, service::{self, naming_scheme::event_concept_name}, @@ -47,11 +48,34 @@ use iceoryx2_cal::named_concept::NamedConceptBuilder; use iceoryx2_cal::{dynamic_storage::DynamicStorage, event::NotifierBuilder}; use std::{cell::UnsafeCell, rc::Rc}; -use super::{ - event_id::EventId, - notify::{NotifierConnectionUpdateFailure, NotifierCreateError, Notify}, - port_identifiers::UniqueListenerId, -}; +/// Failures that can occur when a new [`Notifier`] is created with the +/// [`crate::service::port_factory::notifier::PortFactoryNotifier`]. +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum NotifierCreateError { + ExceedsMaxSupportedNotifiers, +} + +impl std::fmt::Display for NotifierCreateError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::write!(f, "{}::{:?}", std::stringify!(Self), self) + } +} + +impl std::error::Error for NotifierCreateError {} + +/// Defines the failures that can occur while a [`Notifier::notify()`] call. +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum NotifierConnectionUpdateFailure { + OnlyPartialUpdate, +} + +impl std::fmt::Display for NotifierConnectionUpdateFailure { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::write!(f, "{}::{:?}", std::stringify!(Self), self) + } +} + +impl std::error::Error for NotifierConnectionUpdateFailure {} #[derive(Debug, Default)] struct ListenerConnections { @@ -206,14 +230,22 @@ impl Notifier { Ok(()) } -} -impl Notify for Notifier { - fn notify(&self) -> Result { + /// Notifies all [`crate::port::listener::Listener`] connected to the service with the default + /// event id provided on creation. + /// On success the number of + /// [`crate::port::listener::Listener`]s that were notified otherwise it returns + /// [`NotifierConnectionUpdateFailure`]. + pub fn notify(&self) -> Result { self.notify_with_custom_event_id(self.default_event_id) } - fn notify_with_custom_event_id( + /// Notifies all [`crate::port::listener::Listener`] connected to the service with a custom + /// [`EventId`]. + /// On success the number of + /// [`crate::port::listener::Listener`]s that were notified otherwise it returns + /// [`NotifierConnectionUpdateFailure`]. + pub fn notify_with_custom_event_id( &self, value: EventId, ) -> Result { diff --git a/iceoryx2/src/port/notify.rs b/iceoryx2/src/port/notify.rs deleted file mode 100644 index 83b7f5790..000000000 --- a/iceoryx2/src/port/notify.rs +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright (c) 2024 Contributors to the Eclipse Foundation -// -// See the NOTICE file(s) distributed with this work for additional -// information regarding copyright ownership. -// -// This program and the accompanying materials are made available under the -// terms of the Apache Software License 2.0 which is available at -// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license -// which is available at https://opensource.org/licenses/MIT. -// -// SPDX-License-Identifier: Apache-2.0 OR MIT - -//! # Example -//! -//! ``` -//! use iceoryx2::prelude::*; -//! use std::boxed::Box; -//! # fn main() -> Result<(), Box> { -//! let event_name = ServiceName::new("MyEventName")?; -//! let event_ipc = zero_copy::Service::new(&event_name) -//! .event() -//! .open_or_create()?; -//! -//! let event_local = process_local::Service::new(&event_name) -//! .event() -//! .open_or_create()?; -//! -//! let mut notifiers: Vec> = vec![]; -//! -//! notifiers.push(Box::new(event_ipc.notifier().default_event_id(EventId::new(123)).create()?)); -//! notifiers.push(Box::new(event_local.notifier().default_event_id(EventId::new(456)).create()?)); -//! -//! for notifier in ¬ifiers { -//! notifier.notify()?; -//! } -//! -//! # Ok(()) -//! # } -//! ``` -//! -//! See also [`crate::port::notifier::Notifier`] - -use super::event_id::EventId; - -/// Failures that can occur when a new [`Notify`]er is created with the -/// [`crate::service::port_factory::notifier::PortFactoryNotifier`]. -#[derive(Debug, PartialEq, Eq, Copy, Clone)] -pub enum NotifierCreateError { - ExceedsMaxSupportedNotifiers, -} - -impl std::fmt::Display for NotifierCreateError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::write!(f, "{}::{:?}", std::stringify!(Self), self) - } -} - -impl std::error::Error for NotifierCreateError {} - -/// Defines the failures that can occur while a [`Notify::notify()`] call. -#[derive(Debug, PartialEq, Eq, Copy, Clone)] -pub enum NotifierConnectionUpdateFailure { - OnlyPartialUpdate, -} - -impl std::fmt::Display for NotifierConnectionUpdateFailure { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::write!(f, "{}::{:?}", std::stringify!(Self), self) - } -} - -impl std::error::Error for NotifierConnectionUpdateFailure {} - -/// The interface of the sending endpoint of an event based communication. -pub trait Notify { - /// Notifies all [`crate::port::listener::Listener`] connected to the service with the default - /// event id provided on creation. - /// On success the number of - /// [`crate::port::listener::Listener`]s that were notified otherwise it returns - /// [`NotifierConnectionUpdateFailure`]. - fn notify(&self) -> Result; - - /// Notifies all [`crate::port::listener::Listener`] connected to the service with a custom - /// [`EventId`]. - /// On success the number of - /// [`crate::port::listener::Listener`]s that were notified otherwise it returns - /// [`NotifierConnectionUpdateFailure`]. - fn notify_with_custom_event_id( - &self, - value: EventId, - ) -> Result; -} diff --git a/iceoryx2/src/port/publish.rs b/iceoryx2/src/port/publish.rs deleted file mode 100644 index 23c209701..000000000 --- a/iceoryx2/src/port/publish.rs +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright (c) 2024 Contributors to the Eclipse Foundation -// -// See the NOTICE file(s) distributed with this work for additional -// information regarding copyright ownership. -// -// This program and the accompanying materials are made available under the -// terms of the Apache Software License 2.0 which is available at -// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license -// which is available at https://opensource.org/licenses/MIT. -// -// SPDX-License-Identifier: Apache-2.0 OR MIT - -//! # Example -//! -//! ``` -//! use iceoryx2::prelude::*; -//! -//! # fn main() -> Result<(), Box> { -//! # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); -//! # -//! let pubsub_ipc = zero_copy::Service::new(&service_name) -//! .publish_subscribe() -//! .open_or_create::()?; -//! -//! let pubsub_local = process_local::Service::new(&service_name) -//! .publish_subscribe() -//! .open_or_create::()?; -//! -//! let mut publishers: Vec>> = vec![]; -//! -//! publishers.push(Box::new(pubsub_ipc.publisher().create()?)); -//! publishers.push(Box::new(pubsub_local.publisher().create()?)); -//! -//! for publisher in publishers { -//! publisher.send_copy(1234); -//! } -//! -//! # Ok(()) -//! # } -//! ``` -//! -//! See also, [`crate::port::publisher::Publisher`]. - -use crate::port::update_connections::UpdateConnections; -use std::{fmt::Debug, mem::MaybeUninit}; - -use iceoryx2_bb_elementary::enum_gen; - -use crate::sample_mut::SampleMut; - -use super::update_connections::ConnectionFailure; - -/// Defines a failure that can occur when a [`Publish`] is created with -/// [`crate::service::port_factory::publisher::PortFactoryPublisher`]. -#[derive(Debug, PartialEq, Eq, Copy, Clone)] -pub enum PublisherCreateError { - ExceedsMaxSupportedPublishers, - UnableToCreateDataSegment, -} - -impl std::fmt::Display for PublisherCreateError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::write!(f, "{}::{:?}", std::stringify!(Self), self) - } -} - -impl std::error::Error for PublisherCreateError {} - -/// Defines a failure that can occur in [`DefaultLoan::loan()`] and [`UninitLoan::loan_uninit()`] -/// or is part of [`PublisherSendError`] emitted in [`SendCopy::send_copy()`]. -#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)] -pub enum PublisherLoanError { - OutOfMemory, - ExceedsMaxLoanedChunks, - InternalFailure, -} - -impl std::fmt::Display for PublisherLoanError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::write!(f, "{}::{:?}", std::stringify!(Self), self) - } -} - -impl std::error::Error for PublisherLoanError {} - -enum_gen! { - /// Failure that can be emitted when a [`crate::sample::Sample`] is sent via [`Publisher::send()`]. - PublisherSendError - mapping: - PublisherLoanError to LoanError, - ConnectionFailure to ConnectionError -} - -impl std::fmt::Display for PublisherSendError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::write!(f, "{}::{:?}", std::stringify!(Self), self) - } -} - -impl std::error::Error for PublisherSendError {} - -pub(crate) mod internal { - use std::fmt::Debug; - - use iceoryx2_cal::zero_copy_connection::PointerOffset; - - use crate::port::update_connections::ConnectionFailure; - - pub(crate) trait PublishMgmt: Debug { - fn return_loaned_sample(&self, distance_to_chunk: PointerOffset); - fn send_impl(&self, address_to_chunk: usize) -> Result; - } -} - -/// Interface of the sending endpoint of a publish-subscriber based communication. -pub trait Publish: - DefaultLoan + UninitLoan + UpdateConnections + SendCopy -{ -} - -/// Copies the payload into the shared memory and sends it. -pub trait SendCopy { - /// Copies the input `value` into a [`crate::sample_mut::SampleMut`] and delivers it. - /// On success it returns the number of [`crate::port::subscriber::Subscriber`]s that received - /// the data, otherwise a [`PublisherSendError`] describing the failure. - /// - /// # Example - /// - /// ``` - /// use iceoryx2::prelude::*; - /// # fn main() -> Result<(), Box> { - /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); - /// # - /// # let service = zero_copy::Service::new(&service_name) - /// # .publish_subscribe() - /// # .open_or_create::()?; - /// # - /// # let publisher = service.publisher().create()?; - /// - /// publisher.send_copy(1234)?; - /// # Ok(()) - /// # } - /// ``` - fn send_copy(&self, value: MessageType) -> Result; -} - -/// Allows loaning of uninitialized shared memory that can be used for storing the payload of the message. -pub trait UninitLoan { - /// Loans/allocates a [`crate::sample_mut::SampleMut`] from the underlying data segment of the [`Publish`]er. - /// The user has to initialize the payload before it can be sent. - /// - /// On failure it returns [`PublisherLoanError`] describing the failure. - /// - /// # Example - /// - /// ``` - /// use iceoryx2::prelude::*; - /// # fn main() -> Result<(), Box> { - /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); - /// # - /// # let service = zero_copy::Service::new(&service_name) - /// # .publish_subscribe() - /// # .open_or_create::()?; - /// # - /// # let publisher = service.publisher().create()?; - /// - /// let sample = publisher.loan_uninit()?; - /// let sample = sample.write_payload(42); // alternatively `sample.payload_mut()` can be use to access the `MaybeUninit` - /// - /// sample.send()?; - /// - /// # Ok(()) - /// # } - /// ``` - fn loan_uninit(&self) -> Result>, PublisherLoanError>; -} - -/// Allows loaning shared memory that can be used for storing the payload of the message. -pub trait DefaultLoan { - /// Loans/allocates a [`crate::sample_mut::SampleMut`] from the underlying data segment of the [`Publish`] - /// and initialize it with the default value. This can be a performance hit and [`UninitLoan::loan_uninit`] - /// can be used to loan a [`core::mem::MaybeUninit`]. - /// - /// On failure it returns [`PublisherLoanError`] describing the failure. - /// - /// # Example - /// - /// ``` - /// use iceoryx2::prelude::*; - /// # fn main() -> Result<(), Box> { - /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); - /// # - /// # let service = zero_copy::Service::new(&service_name) - /// # .publish_subscribe() - /// # .open_or_create::()?; - /// # - /// # let publisher = service.publisher().create()?; - /// - /// let mut sample = publisher.loan()?; - /// *sample.payload_mut() = 42; - /// - /// sample.send()?; - /// - /// # Ok(()) - /// # } - /// ``` - fn loan(&self) -> Result, PublisherLoanError>; -} diff --git a/iceoryx2/src/port/publisher.rs b/iceoryx2/src/port/publisher.rs index 919899fb4..6adde0b67 100644 --- a/iceoryx2/src/port/publisher.rs +++ b/iceoryx2/src/port/publisher.rs @@ -58,20 +58,14 @@ use std::cell::UnsafeCell; use std::fmt::Debug; use std::rc::Rc; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::{alloc::Layout, marker::PhantomData, mem::MaybeUninit}; use super::port_identifiers::{UniquePublisherId, UniqueSubscriberId}; -use super::publish::internal::PublishMgmt; -use super::publish::{ - DefaultLoan, Publish, PublisherCreateError, PublisherLoanError, PublisherSendError, SendCopy, - UninitLoan, -}; use crate::message::Message; -use crate::payload_mut::{internal::PayloadMgmt, PayloadMut, UninitPayloadMut}; use crate::port::details::subscriber_connections::*; use crate::port::update_connections::{ConnectionFailure, UpdateConnections}; -use crate::port::{DegrationAction, DegrationCallback}; +use crate::port::DegrationAction; use crate::raw_sample::RawSampleMut; use crate::service; use crate::service::config_scheme::data_segment_config; @@ -82,126 +76,205 @@ use crate::service::static_config::publish_subscribe; use crate::{config, sample_mut::SampleMut}; use iceoryx2_bb_container::queue::Queue; use iceoryx2_bb_elementary::allocator::AllocationError; +use iceoryx2_bb_elementary::enum_gen; use iceoryx2_bb_lock_free::mpmc::container::{ContainerHandle, ContainerState}; use iceoryx2_bb_log::{fail, fatal_panic, warn}; use iceoryx2_cal::dynamic_storage::DynamicStorage; use iceoryx2_cal::named_concept::NamedConceptBuilder; -use iceoryx2_cal::shared_memory::{SharedMemory, SharedMemoryBuilder, SharedMemoryCreateError}; +use iceoryx2_cal::shared_memory::{ + SharedMemory, SharedMemoryBuilder, SharedMemoryCreateError, ShmPointer, +}; use iceoryx2_cal::shm_allocator::pool_allocator::PoolAllocator; use iceoryx2_cal::shm_allocator::{self, PointerOffset, ShmAllocationError}; use iceoryx2_cal::zero_copy_connection::{ ZeroCopyConnection, ZeroCopyCreationError, ZeroCopySendError, ZeroCopySender, }; -/// Sending endpoint of a publish-subscriber based communication. +/// Defines a failure that can occur when a [`Publisher`] is created with +/// [`crate::service::port_factory::publisher::PortFactoryPublisher`]. +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum PublisherCreateError { + ExceedsMaxSupportedPublishers, + UnableToCreateDataSegment, +} + +impl std::fmt::Display for PublisherCreateError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::write!(f, "{}::{:?}", std::stringify!(Self), self) + } +} + +impl std::error::Error for PublisherCreateError {} + +/// Defines a failure that can occur in [`Publisher::loan()`] and [`Publisher::loan_uninit()`] +/// or is part of [`PublisherSendError`] emitted in [`Publisher::send_copy()`]. +#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)] +pub enum PublisherLoanError { + OutOfMemory, + ExceedsMaxLoanedChunks, + InternalFailure, +} + +impl std::fmt::Display for PublisherLoanError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::write!(f, "{}::{:?}", std::stringify!(Self), self) + } +} + +impl std::error::Error for PublisherLoanError {} + +enum_gen! { + /// Failure that can be emitted when a [`crate::sample::Sample`] is sent via [`Publisher::send()`]. + PublisherSendError + entry: + ConnectionBrokenSincePublisherNoLongerExists + mapping: + PublisherLoanError to LoanError, + ConnectionFailure to ConnectionError +} + +impl std::fmt::Display for PublisherSendError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::write!(f, "{}::{:?}", std::stringify!(Self), self) + } +} + +impl std::error::Error for PublisherSendError {} + #[derive(Debug)] -pub struct Publisher<'a, Service: service::Service, MessageType: Debug> { +pub(crate) struct DataSegment { + sample_reference_counter: Vec, + memory: Service::SharedMemory, + message_size: usize, + message_type_layout: Layout, port_id: UniquePublisherId, - pub(crate) sample_reference_counter: Vec, - pub(crate) data_segment: Service::SharedMemory, config: LocalPublisherConfig, dynamic_storage: Rc, subscriber_connections: SubscriberConnections, subscriber_list_state: UnsafeCell>, history: Option>>, - service: &'a Service, - degration_callback: Option>, + static_config: crate::service::static_config::StaticConfig, loan_counter: AtomicUsize, - dynamic_publisher_handle: ContainerHandle, - _phantom_message_type: PhantomData, + is_active: AtomicBool, } -impl<'a, Service: service::Service, MessageType: Debug> Drop - for Publisher<'a, Service, MessageType> -{ - fn drop(&mut self) { - self.dynamic_storage - .get() - .publish_subscribe() - .release_publisher_handle(self.dynamic_publisher_handle) +impl DataSegment { + fn sample_index(&self, distance_to_chunk: usize) -> usize { + distance_to_chunk / self.message_size } -} -impl<'a, Service: service::Service, MessageType: Debug> Publisher<'a, Service, MessageType> { - pub(crate) fn new( - service: &'a Service, - static_config: &publish_subscribe::StaticConfig, - config: &LocalPublisherConfig, - ) -> Result { - let msg = "Unable to create Publisher port"; - let origin = "Publisher::new()"; - let port_id = UniquePublisherId::new(); - let subscriber_list = &service - .state() - .dynamic_storage - .get() - .publish_subscribe() - .subscribers; + fn allocate(&self, layout: Layout) -> Result { + self.retrieve_returned_samples(); - let dynamic_storage = Rc::clone(&service.state().dynamic_storage); - let number_of_samples = service - .state() - .static_config - .messaging_pattern - .required_amount_of_samples_per_data_segment(config.max_loaned_samples); + let msg = "Unable to allocate Sample"; + let ptr = self.memory.allocate(layout)?; + if self.sample_reference_counter[self.sample_index(ptr.offset.value())] + .fetch_add(1, Ordering::Relaxed) + != 0 + { + fatal_panic!(from self, + "{} since the allocated sample is already in use! This should never happen!", msg); + } - let data_segment = fail!(from origin, when Self::create_data_segment(port_id, service.state().global_config.as_ref(), number_of_samples), - with PublisherCreateError::UnableToCreateDataSegment, - "{} since the data segment could not be acquired.", msg); + Ok(ptr) + } - // !MUST! be the last task otherwise a publisher is added to the dynamic config without the - // creation of all required resources - let dynamic_publisher_handle = match service - .state() - .dynamic_storage - .get() - .publish_subscribe() - .add_publisher_id(port_id) + fn borrow_sample(&self, distance_to_chunk: usize) { + self.sample_reference_counter[self.sample_index(distance_to_chunk)] + .fetch_add(1, Ordering::Relaxed); + } + + fn release_sample(&self, distance_to_chunk: PointerOffset) { + if self.sample_reference_counter[self.sample_index(distance_to_chunk.value())] + .fetch_sub(1, Ordering::Relaxed) + == 1 { - Some(unique_index) => unique_index, - None => { - fail!(from origin, with PublisherCreateError::ExceedsMaxSupportedPublishers, - "{} since it would exceed the maximum supported amount of publishers of {}.", - msg, service.state().static_config.publish_subscribe().max_publishers); + unsafe { + fatal_panic!(from self, when self.memory + .deallocate( + distance_to_chunk, + self.message_type_layout, + ), "Internal logic error. The sample should always contain a valid memory chunk from the provided allocator."); + }; + } + } + + fn retrieve_returned_samples(&self) { + for i in 0..self.subscriber_connections.len() { + match self.subscriber_connections.get(i) { + Some(ref connection) => loop { + match connection.sender.reclaim() { + Ok(Some(ptr_dist)) => { + self.release_sample(ptr_dist); + } + Ok(None) => break, + Err(e) => { + warn!(from self, "Unable to reclaim samples from connection {:?} due to {:?}. This may lead to a situation where no more samples will be delivered to this connection.", connection, e) + } + } + }, + None => (), } - }; + } + } - let new_self = Self { - port_id, - subscriber_connections: SubscriberConnections::new( - subscriber_list.capacity(), - &service.state().global_config, - port_id, - static_config, - ), - dynamic_storage, - data_segment, - config: *config, - sample_reference_counter: { - let mut v = Vec::with_capacity(number_of_samples); - for _ in 0..number_of_samples { - v.push(AtomicU64::new(0)); + pub(crate) fn return_loaned_sample(&self, distance_to_chunk: PointerOffset) { + self.release_sample(distance_to_chunk); + self.loan_counter.fetch_sub(1, Ordering::Relaxed); + } + + fn add_sample_to_history(&self, address_to_chunk: usize) { + match &self.history { + None => (), + Some(history) => { + let history = unsafe { &mut *history.get() }; + self.borrow_sample(address_to_chunk); + match unsafe { history.push_with_overflow(address_to_chunk) } { + None => (), + Some(old) => self.release_sample(PointerOffset::new(old)), } - v - }, - subscriber_list_state: unsafe { UnsafeCell::new(subscriber_list.get_state()) }, - history: match static_config.history_size == 0 { - true => None, - false => Some(UnsafeCell::new(Queue::new(static_config.history_size))), - }, - service, - degration_callback: None, - loan_counter: AtomicUsize::new(0), - dynamic_publisher_handle, - _phantom_message_type: PhantomData, + } + } + } + + fn deliver_sample(&self, address_to_chunk: usize) -> usize { + self.retrieve_returned_samples(); + + let deliver_call = match self.config.unable_to_deliver_strategy { + UnableToDeliverStrategy::Block => { + ::Sender::blocking_send + } + UnableToDeliverStrategy::DiscardSample => { + ::Sender::try_send + } }; - if let Err(e) = new_self.populate_subscriber_channels() { - warn!(from new_self, "The new Publisher port is unable to connect to every Subscriber port, caused by {:?}.", e); - } + let mut number_of_recipients = 0; + for i in 0..self.subscriber_connections.len() { + match self.subscriber_connections.get(i) { + Some(ref connection) => { + match deliver_call(&connection.sender, PointerOffset::new(address_to_chunk)) { + Err(ZeroCopySendError::ReceiveBufferFull) => { + /* causes no problem + * blocking_send => can never happen + * try_send => we tried and expect that the buffer is full + * */ + } + Ok(overflow) => { + self.borrow_sample(address_to_chunk); + number_of_recipients += 1; - Ok(new_self) + if let Some(old) = overflow { + self.release_sample(old) + } + } + } + } + None => (), + } + } + number_of_recipients } fn populate_subscriber_channels(&self) -> Result<(), ZeroCopyCreationError> { @@ -223,14 +296,14 @@ impl<'a, Service: service::Service, MessageType: Debug> Publisher<'a, Service, M match self.subscriber_connections.create(i, *subscriber_id) { Ok(false) => (), Ok(true) => match &self.subscriber_connections.get(i) { - Some(connection) => self.deliver_history(connection), + Some(connection) => self.deliver_sample_history(connection), None => { fatal_panic!(from self, "This should never happen! Unable to acquire previously created subscriber connection.") } }, - Err(e) => match &self.degration_callback { + Err(e) => match &self.config.degration_callback { Some(c) => match c.call( - self.service.state().static_config.clone(), + self.static_config.clone(), self.port_id, *subscriber_id, ) { @@ -256,7 +329,22 @@ impl<'a, Service: service::Service, MessageType: Debug> Publisher<'a, Service, M Ok(()) } - fn deliver_history(&self, connection: &Connection) { + fn update_connections(&self) -> Result<(), ConnectionFailure> { + if unsafe { + self.dynamic_storage + .get() + .publish_subscribe() + .subscribers + .update_state(&mut *self.subscriber_list_state.get()) + } { + fail!(from self, when self.populate_subscriber_channels(), + "Connections were updated only partially since at least one connection to a Subscriber port failed."); + } + + Ok(()) + } + + fn deliver_sample_history(&self, connection: &Connection) { match &self.history { None => (), Some(history) => { @@ -265,10 +353,7 @@ impl<'a, Service: service::Service, MessageType: Debug> Publisher<'a, Service, M let ptr_distance = unsafe { history.get_unchecked(i) }; match connection.sender.try_send(PointerOffset::new(ptr_distance)) { - Ok(_) => { - self.sample_reference_counter[Self::sample_index(ptr_distance)] - .fetch_add(1, Ordering::Relaxed); - } + Ok(_) => self.borrow_sample(ptr_distance), Err(e) => { warn!(from self, "Failed to deliver history to new subscriber via {:?} due to {:?}", connection, e); } @@ -278,8 +363,122 @@ impl<'a, Service: service::Service, MessageType: Debug> Publisher<'a, Service, M } } - fn sample_index(distance_to_chunk: usize) -> usize { - distance_to_chunk / std::mem::size_of::>() + pub(crate) fn send_sample(&self, address_to_chunk: usize) -> Result { + let msg = "Unable to send sample"; + if !self.is_active.load(Ordering::Relaxed) { + fail!(from self, with PublisherSendError::ConnectionBrokenSincePublisherNoLongerExists, + "{} since the connections could not be updated.", msg); + } + + fail!(from self, when self.update_connections(), + "{} since the connections could not be updated.", msg); + + self.add_sample_to_history(address_to_chunk); + Ok(self.deliver_sample(address_to_chunk)) + } +} + +/// Sending endpoint of a publish-subscriber based communication. +#[derive(Debug)] +pub struct Publisher { + pub(crate) data_segment: Rc>, + dynamic_publisher_handle: ContainerHandle, + _phantom_message_type: PhantomData, +} + +impl Drop for Publisher { + fn drop(&mut self) { + self.data_segment + .dynamic_storage + .get() + .publish_subscribe() + .release_publisher_handle(self.dynamic_publisher_handle) + } +} + +impl Publisher { + pub(crate) fn new( + service: &Service, + static_config: &publish_subscribe::StaticConfig, + config: LocalPublisherConfig, + ) -> Result { + let msg = "Unable to create Publisher port"; + let origin = "Publisher::new()"; + let port_id = UniquePublisherId::new(); + let subscriber_list = &service + .state() + .dynamic_storage + .get() + .publish_subscribe() + .subscribers; + + let dynamic_storage = Rc::clone(&service.state().dynamic_storage); + let number_of_samples = service + .state() + .static_config + .messaging_pattern + .required_amount_of_samples_per_data_segment(config.max_loaned_samples); + + let data_segment = fail!(from origin, when Self::create_data_segment(port_id, service.state().global_config.as_ref(), number_of_samples), + with PublisherCreateError::UnableToCreateDataSegment, + "{} since the data segment could not be acquired.", msg); + + // !MUST! be the last task otherwise a publisher is added to the dynamic config without the + // creation of all required resources + let dynamic_publisher_handle = match service + .state() + .dynamic_storage + .get() + .publish_subscribe() + .add_publisher_id(port_id) + { + Some(unique_index) => unique_index, + None => { + fail!(from origin, with PublisherCreateError::ExceedsMaxSupportedPublishers, + "{} since it would exceed the maximum supported amount of publishers of {}.", + msg, service.state().static_config.publish_subscribe().max_publishers); + } + }; + + let new_self = Self { + data_segment: Rc::new(DataSegment { + is_active: AtomicBool::new(true), + memory: data_segment, + message_size: std::mem::size_of::>(), + message_type_layout: Layout::new::(), + sample_reference_counter: { + let mut v = Vec::with_capacity(number_of_samples); + for _ in 0..number_of_samples { + v.push(AtomicU64::new(0)); + } + v + }, + dynamic_storage, + port_id, + subscriber_connections: SubscriberConnections::new( + subscriber_list.capacity(), + &service.state().global_config, + port_id, + static_config, + ), + config, + subscriber_list_state: unsafe { UnsafeCell::new(subscriber_list.get_state()) }, + history: match static_config.history_size == 0 { + true => None, + false => Some(UnsafeCell::new(Queue::new(static_config.history_size))), + }, + static_config: service.state().static_config.clone(), + loan_counter: AtomicUsize::new(0), + }), + dynamic_publisher_handle, + _phantom_message_type: PhantomData, + }; + + if let Err(e) = new_self.data_segment.populate_subscriber_channels() { + warn!(from new_self, "The new Publisher port is unable to connect to every Subscriber port, caused by {:?}.", e); + } + + Ok(new_self) } fn create_data_segment( @@ -303,179 +502,76 @@ impl<'a, Service: service::Service, MessageType: Debug> Publisher<'a, Service, M "Unable to create the data segment.")) } - fn add_to_history(&self, address_to_chunk: usize) { - match &self.history { - None => (), - Some(history) => { - let history = unsafe { &mut *history.get() }; - self.sample_reference_counter[Self::sample_index(address_to_chunk)] - .fetch_add(1, Ordering::Relaxed); - match unsafe { history.push_with_overflow(address_to_chunk) } { - None => (), - Some(old) => self.release_sample(PointerOffset::new(old)), - } - } - } - } - - fn deliver_sample(&self, address_to_chunk: usize) -> usize { - let deliver_call = match self.config.unable_to_deliver_strategy { - UnableToDeliverStrategy::Block => { - ::Sender::blocking_send - } - UnableToDeliverStrategy::DiscardSample => { - ::Sender::try_send - } - }; - - let mut number_of_recipients = 0; - for i in 0..self.subscriber_connections.len() { - match self.subscriber_connections.get(i) { - Some(ref connection) => { - match deliver_call(&connection.sender, PointerOffset::new(address_to_chunk)) { - Err(ZeroCopySendError::ReceiveBufferFull) => { - /* causes no problem - * blocking_send => can never happen - * try_send => we tried and expect that the buffer is full - * */ - } - Ok(overflow) => { - self.sample_reference_counter[Self::sample_index(address_to_chunk)] - .fetch_add(1, Ordering::Relaxed); - number_of_recipients += 1; - - if let Some(old) = overflow { - self.release_sample(old) - } - } - } - } - None => (), - } - } - number_of_recipients - } - - fn release_sample(&self, distance_to_chunk: PointerOffset) { - if self.sample_reference_counter[Self::sample_index(distance_to_chunk.value())] - .fetch_sub(1, Ordering::Relaxed) - == 1 - { - unsafe { - fatal_panic!(from self, when self.data_segment - .deallocate( - distance_to_chunk, - Layout::new::(), - ), "Internal logic error. The sample should always contain a valid memory chunk from the provided allocator."); - }; - } - } - - fn retrieve_returned_samples(&self) { - for i in 0..self.subscriber_connections.len() { - match self.subscriber_connections.get(i) { - Some(ref connection) => loop { - match connection.sender.reclaim() { - Ok(Some(ptr_dist)) => { - let sample_index = Self::sample_index(ptr_dist.value()); - - if self.sample_reference_counter[sample_index] - .fetch_sub(1, Ordering::Relaxed) - == 1 - { - unsafe { - fatal_panic!(from self, when self.data_segment - .deallocate( - ptr_dist, - Layout::new::>(), - ), "This should never happen! Failed to deallocate the reclaimed ptr. Either the data was corrupted or an invalid ptr was returned.") - }; - } - } - Ok(None) => break, - Err(e) => { - warn!(from self, "Unable to reclaim samples from connection {:?} due to {:?}. This may lead to a situation where no more samples will be delivered to this connection.", connection, e) - } - } - }, - None => (), - } - } - } - - /// Sets the [`DegrationCallback`] of the [`Publisher`]. Whenever a connection to a - /// [`crate::port::subscriber::Subscriber`] is corrupted or a seems to be dead, this callback - /// is called and depending on the returned [`DegrationAction`] measures will be taken. - pub fn set_degration_callback< - 'publisher, - F: Fn( - service::static_config::StaticConfig, - UniquePublisherId, - UniqueSubscriberId, - ) -> DegrationAction - + 'a, - >( - &mut self, - callback: Option, - ) { - match callback { - Some(c) => self.degration_callback = Some(DegrationCallback::new(c)), - None => self.degration_callback = None, - } - } -} -impl<'a, Service: service::Service, MessageType: Debug + Default> Publish - for Publisher<'a, Service, MessageType> -{ -} - -impl<'a, Service: service::Service, MessageType: Debug> UpdateConnections - for Publisher<'a, Service, MessageType> -{ - fn update_connections(&self) -> Result<(), ConnectionFailure> { - if unsafe { - self.dynamic_storage - .get() - .publish_subscribe() - .subscribers - .update_state(&mut *self.subscriber_list_state.get()) - } { - fail!(from self, when self.populate_subscriber_channels(), - "Connections were updated only partially since at least one connection to a Subscriber port failed."); - } - - Ok(()) - } -} - -impl<'a, Service: service::Service, MessageType: Debug> SendCopy - for Publisher<'a, Service, MessageType> -{ - fn send_copy(&self, value: MessageType) -> Result { + /// Copies the input `value` into a [`crate::sample_mut::SampleMut`] and delivers it. + /// On success it returns the number of [`crate::port::subscriber::Subscriber`]s that received + /// the data, otherwise a [`PublisherSendError`] describing the failure. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// # fn main() -> Result<(), Box> { + /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); + /// # + /// # let service = zero_copy::Service::new(&service_name) + /// # .publish_subscribe() + /// # .open_or_create::()?; + /// # + /// # let publisher = service.publisher().create()?; + /// + /// publisher.send_copy(1234)?; + /// # Ok(()) + /// # } + /// ``` + pub fn send_copy(&self, value: MessageType) -> Result { let msg = "Unable to send copy of message"; let mut sample = fail!(from self, when self.loan_uninit(), "{} since the loan of a sample failed.", msg); sample.payload_mut().write(value); Ok( - fail!(from self, when self.send_impl(sample.offset_to_chunk().value()), + fail!(from self, when self.data_segment.send_sample(sample.offset_to_chunk.value()), "{} since the underlying send operation failed.", msg), ) } -} -impl<'a, Service: service::Service, MessageType: Debug> UninitLoan - for Publisher<'a, Service, MessageType> -{ - fn loan_uninit(&self) -> Result>, PublisherLoanError> { + /// Loans/allocates a [`crate::sample_mut::SampleMut`] from the underlying data segment of the [`Publisher`]. + /// The user has to initialize the payload before it can be sent. + /// + /// On failure it returns [`PublisherLoanError`] describing the failure. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// # fn main() -> Result<(), Box> { + /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); + /// # + /// # let service = zero_copy::Service::new(&service_name) + /// # .publish_subscribe() + /// # .open_or_create::()?; + /// # + /// # let publisher = service.publisher().create()?; + /// + /// let sample = publisher.loan_uninit()?; + /// let sample = sample.write_payload(42); // alternatively `sample.payload_mut()` can be use to access the `MaybeUninit` + /// + /// sample.send()?; + /// + /// # Ok(()) + /// # } + /// ``` + pub fn loan_uninit( + &self, + ) -> Result, Service>, PublisherLoanError> { let msg = "Unable to loan Sample"; - self.retrieve_returned_samples(); - - if self.loan_counter.load(Ordering::Relaxed) >= self.config.max_loaned_samples { + if self.data_segment.loan_counter.load(Ordering::Relaxed) + >= self.data_segment.config.max_loaned_samples + { fail!(from self, with PublisherLoanError::ExceedsMaxLoanedChunks, "{} since already {} samples were loaned and it would exceed the maximum of parallel loans of {}. Release or send a loaned sample to loan another sample.", - msg, self.loan_counter.load(Ordering::Relaxed), self.config.max_loaned_samples); + msg, self.data_segment.loan_counter.load(Ordering::Relaxed), self.data_segment.config.max_loaned_samples); } match self @@ -483,20 +579,12 @@ impl<'a, Service: service::Service, MessageType: Debug> UninitLoan .allocate(Layout::new::>()) { Ok(chunk) => { - if self.sample_reference_counter[Self::sample_index(chunk.offset.value())] - .fetch_add(1, Ordering::Relaxed) - != 0 - { - fatal_panic!(from self, - "{} since the allocated sample is already in use! This should never happen!", msg); - } - let message = chunk.data_ptr as *mut MaybeUninit>>; let sample = unsafe { (*message).write(Message { - header: Header::new(self.port_id), + header: Header::new(self.data_segment.port_id), data: MaybeUninit::uninit(), }); RawSampleMut::new_unchecked( @@ -504,8 +592,10 @@ impl<'a, Service: service::Service, MessageType: Debug> UninitLoan ) }; - self.loan_counter.fetch_add(1, Ordering::Relaxed); - Ok(SampleMut::new(self, sample, chunk.offset)) + self.data_segment + .loan_counter + .fetch_add(1, Ordering::Relaxed); + Ok(SampleMut::new(&self.data_segment, sample, chunk.offset)) } Err(ShmAllocationError::AllocationError(AllocationError::OutOfMemory)) => { fail!(from self, with PublisherLoanError::OutOfMemory, @@ -523,28 +613,43 @@ impl<'a, Service: service::Service, MessageType: Debug> UninitLoan } } -impl<'a, Service: service::Service, MessageType: Debug> PublishMgmt - for Publisher<'a, Service, MessageType> -{ - fn return_loaned_sample(&self, distance_to_chunk: PointerOffset) { - self.release_sample(distance_to_chunk); - self.loan_counter.fetch_sub(1, Ordering::Relaxed); - } - - fn send_impl(&self, address_to_chunk: usize) -> Result { - fail!(from self, when self.update_connections(), - "Unable to send sample since the connections could not be updated."); - - self.add_to_history(address_to_chunk); - self.retrieve_returned_samples(); - Ok(self.deliver_sample(address_to_chunk)) +impl Publisher { + /// Loans/allocates a [`crate::sample_mut::SampleMut`] from the underlying data segment of the [`Publisher`] + /// and initialize it with the default value. This can be a performance hit and [`Publisher::loan_uninit`] + /// can be used to loan a [`core::mem::MaybeUninit`]. + /// + /// On failure it returns [`PublisherLoanError`] describing the failure. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// # fn main() -> Result<(), Box> { + /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); + /// # + /// # let service = zero_copy::Service::new(&service_name) + /// # .publish_subscribe() + /// # .open_or_create::()?; + /// # + /// # let publisher = service.publisher().create()?; + /// + /// let mut sample = publisher.loan()?; + /// *sample.payload_mut() = 42; + /// + /// sample.send()?; + /// + /// # Ok(()) + /// # } + /// ``` + pub fn loan(&self) -> Result, PublisherLoanError> { + Ok(self.loan_uninit()?.write_payload(MessageType::default())) } } -impl<'a, Service: service::Service, MessageType: Default + Debug> DefaultLoan - for Publisher<'a, Service, MessageType> +impl UpdateConnections + for Publisher { - fn loan(&self) -> Result, PublisherLoanError> { - Ok(self.loan_uninit()?.write_payload(MessageType::default())) + fn update_connections(&self) -> Result<(), ConnectionFailure> { + self.data_segment.update_connections() } } diff --git a/iceoryx2/src/port/subscribe.rs b/iceoryx2/src/port/subscribe.rs deleted file mode 100644 index 5e572fe00..000000000 --- a/iceoryx2/src/port/subscribe.rs +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright (c) 2024 Contributors to the Eclipse Foundation -// -// See the NOTICE file(s) distributed with this work for additional -// information regarding copyright ownership. -// -// This program and the accompanying materials are made available under the -// terms of the Apache Software License 2.0 which is available at -// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license -// which is available at https://opensource.org/licenses/MIT. -// -// SPDX-License-Identifier: Apache-2.0 OR MIT - -//! # Example -//! -//! ``` -//! use iceoryx2::prelude::*; -//! -//! # fn main() -> Result<(), Box> { -//! let service_name = ServiceName::new("My/Funk/ServiceName")?; -//! let pubsub_ipc = zero_copy::Service::new(&service_name) -//! .publish_subscribe() -//! .open_or_create::()?; -//! -//! let pubsub_local = process_local::Service::new(&service_name) -//! .publish_subscribe() -//! .open_or_create::()?; -//! -//! let mut subscribers: Vec>> = vec![]; -//! subscribers.push(Box::new( pubsub_ipc.subscriber().create()?)); -//! subscribers.push(Box::new( pubsub_local.subscriber().create()?)); -//! -//! for subscriber in subscribers { -//! while let Some(sample) = subscriber.receive()? { -//! println!("received: {:?}", *sample); -//! } -//! } -//! -//! # Ok(()) -//! # } -//! ``` - -use std::fmt::Debug; - -use crate::sample::Sample; - -use crate::port::update_connections::ConnectionFailure; - -/// Defines the failure that can occur when receiving data with [`Subscribe::receive()`]. -#[derive(Debug, PartialEq, Eq, Copy, Clone)] -pub enum SubscriberReceiveError { - ExceedsMaxBorrowedSamples, - ConnectionFailure(ConnectionFailure), -} - -impl std::fmt::Display for SubscriberReceiveError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::write!(f, "{}::{:?}", std::stringify!(Self), self) - } -} - -impl std::error::Error for SubscriberReceiveError {} - -/// Describes the failures when a new [`Subscribe`] is created via the -/// [`crate::service::port_factory::subscriber::PortFactorySubscriber`]. -#[derive(Debug, PartialEq, Eq, Copy, Clone)] -pub enum SubscriberCreateError { - ExceedsMaxSupportedSubscribers, -} - -impl std::fmt::Display for SubscriberCreateError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::write!(f, "{}::{:?}", std::stringify!(Self), self) - } -} - -impl std::error::Error for SubscriberCreateError {} - -pub(crate) mod internal { - use std::fmt::Debug; - - pub(crate) trait SubscribeMgmt: Debug { - fn release_sample(&self, channel_id: usize, sample: usize); - } -} - -/// The interface of the receiving endpoint of a publish-subscribe communication. -pub trait Subscribe { - /// Receives a [`crate::sample::Sample`] from [`crate::port::publisher::Publisher`]. If no sample could be - /// received [`None`] is returned. If a failure occurs [`SubscriberReceiveError`] is returned. - fn receive(&self) -> Result>, SubscriberReceiveError>; - - /// Explicitly updates all connections to the [`crate::port::publisher::Publisher`]s. This is - /// required to be called whenever a new [`crate::port::publisher::Publisher`] connected to - /// the service. It is done implicitly whenever [`Subscribe::receive()`] - /// is called. - fn update_connections(&self) -> Result<(), ConnectionFailure>; -} diff --git a/iceoryx2/src/port/subscriber.rs b/iceoryx2/src/port/subscriber.rs index b01614dad..b1f81a65c 100644 --- a/iceoryx2/src/port/subscriber.rs +++ b/iceoryx2/src/port/subscriber.rs @@ -37,11 +37,12 @@ use std::marker::PhantomData; use std::rc::Rc; use iceoryx2_bb_lock_free::mpmc::container::{ContainerHandle, ContainerState}; -use iceoryx2_bb_log::{fail, fatal_panic, warn}; +use iceoryx2_bb_log::{fail, warn}; use iceoryx2_cal::dynamic_storage::DynamicStorage; use iceoryx2_cal::{shared_memory::*, zero_copy_connection::*}; use crate::port::DegrationAction; +use crate::service::port_factory::subscriber::SubscriberConfig; use crate::service::static_config::publish_subscribe::StaticConfig; use crate::{ message::Message, raw_sample::RawSample, sample::Sample, service, @@ -50,27 +51,60 @@ use crate::{ use super::details::publisher_connections::{Connection, PublisherConnections}; use super::port_identifiers::{UniquePublisherId, UniqueSubscriberId}; -use super::subscribe::internal::SubscribeMgmt; -use super::subscribe::{Subscribe, SubscriberCreateError, SubscriberReceiveError}; use super::update_connections::ConnectionFailure; -use super::DegrationCallback; + +/// Defines the failure that can occur when receiving data with [`Subscriber::receive()`]. +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum SubscriberReceiveError { + ExceedsMaxBorrowedSamples, + ConnectionFailure(ConnectionFailure), +} + +impl std::fmt::Display for SubscriberReceiveError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::write!(f, "{}::{:?}", std::stringify!(Self), self) + } +} + +impl std::error::Error for SubscriberReceiveError {} + +/// Describes the failures when a new [`Subscriber`] is created via the +/// [`crate::service::port_factory::subscriber::PortFactorySubscriber`]. +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +pub enum SubscriberCreateError { + ExceedsMaxSupportedSubscribers, +} + +impl std::fmt::Display for SubscriberCreateError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::write!(f, "{}::{:?}", std::stringify!(Self), self) + } +} + +impl std::error::Error for SubscriberCreateError {} + +pub(crate) mod internal { + use std::fmt::Debug; + + pub(crate) trait SubscribeMgmt: Debug { + fn release_sample(&self, channel_id: usize, sample: usize); + } +} /// The receiving endpoint of a publish-subscribe communication. #[derive(Debug)] -pub struct Subscriber<'a, Service: service::Service, MessageType: Debug> { +pub struct Subscriber { dynamic_subscriber_handle: ContainerHandle, - publisher_connections: PublisherConnections, + publisher_connections: Rc>, dynamic_storage: Rc, - service: &'a Service, - degration_callback: Option>, + static_config: crate::service::static_config::StaticConfig, + config: SubscriberConfig, publisher_list_state: UnsafeCell>, _phantom_message_type: PhantomData, } -impl<'a, Service: service::Service, MessageType: Debug> Drop - for Subscriber<'a, Service, MessageType> -{ +impl Drop for Subscriber { fn drop(&mut self) { self.dynamic_storage .get() @@ -79,10 +113,11 @@ impl<'a, Service: service::Service, MessageType: Debug> Drop } } -impl<'a, Service: service::Service, MessageType: Debug> Subscriber<'a, Service, MessageType> { +impl Subscriber { pub(crate) fn new( - service: &'a Service, + service: &Service, static_config: &StaticConfig, + config: SubscriberConfig, ) -> Result { let msg = "Failed to create Subscriber port"; let origin = "Subscriber::new()"; @@ -114,17 +149,17 @@ impl<'a, Service: service::Service, MessageType: Debug> Subscriber<'a, Service, }; let new_self = Self { - publisher_connections: PublisherConnections::new( + config, + publisher_connections: Rc::new(PublisherConnections::new( publisher_list.capacity(), port_id, &service.state().global_config, static_config, - ), + )), dynamic_storage, publisher_list_state: UnsafeCell::new(unsafe { publisher_list.get_state() }), dynamic_subscriber_handle, - service, - degration_callback: None, + static_config: service.state().static_config.clone(), _phantom_message_type: PhantomData, }; @@ -150,13 +185,13 @@ impl<'a, Service: service::Service, MessageType: Debug> Subscriber<'a, Service, match index { Some(publisher_id) => match self.publisher_connections.create(i, *publisher_id) { Ok(()) => (), - Err(e) => match &self.degration_callback { + Err(e) => match &self.config.degration_callback { None => { warn!(from self, "Unable to establish connection to new publisher {:?}.", publisher_id) } Some(c) => { match c.call( - self.service.state().static_config.clone(), + self.static_config.clone(), *publisher_id, self.publisher_connections.subscriber_id(), ) { @@ -178,11 +213,11 @@ impl<'a, Service: service::Service, MessageType: Debug> Subscriber<'a, Service, Ok(()) } - fn receive_from_connection<'subscriber>( - &'subscriber self, + fn receive_from_connection( + &self, channel_id: usize, connection: &mut Connection, - ) -> Result>, SubscriberReceiveError> { + ) -> Result>, SubscriberReceiveError> { let msg = "Unable to receive another sample"; match connection.receiver.receive() { Ok(data) => match data { @@ -191,7 +226,7 @@ impl<'a, Service: service::Service, MessageType: Debug> Subscriber<'a, Service, let absolute_address = relative_addr.value() + connection.data_segment.allocator_data_start_address(); Ok(Some(Sample { - subscriber: self, + publisher_connections: Rc::clone(&self.publisher_connections), channel_id, ptr: unsafe { RawSample::new_unchecked( @@ -209,31 +244,9 @@ impl<'a, Service: service::Service, MessageType: Debug> Subscriber<'a, Service, } } - /// Sets the [`DegrationCallback`] of the [`Subscriber`]. Whenever a connection to a - /// [`crate::port::publisher::Publisher`] is corrupted or a seems to be dead, this callback - /// is called and depending on the returned [`DegrationAction`] measures will be taken. - pub fn set_degration_callback< - F: Fn( - service::static_config::StaticConfig, - UniquePublisherId, - UniqueSubscriberId, - ) -> DegrationAction - + 'a, - >( - &mut self, - callback: Option, - ) { - match callback { - Some(c) => self.degration_callback = Some(DegrationCallback::new(c)), - None => self.degration_callback = None, - } - } -} - -impl<'a, Service: service::Service, MessageType: Debug> Subscribe - for Subscriber<'a, Service, MessageType> -{ - fn receive(&self) -> Result>, SubscriberReceiveError> { + /// Receives a [`crate::sample::Sample`] from [`crate::port::publisher::Publisher`]. If no sample could be + /// received [`None`] is returned. If a failure occurs [`SubscriberReceiveError`] is returned. + pub fn receive(&self) -> Result>, SubscriberReceiveError> { if let Err(e) = self.update_connections() { fail!(from self, with SubscriberReceiveError::ConnectionFailure(e), @@ -254,7 +267,11 @@ impl<'a, Service: service::Service, MessageType: Debug> Subscribe Ok(None) } - fn update_connections(&self) -> Result<(), ConnectionFailure> { + /// Explicitly updates all connections to the [`crate::port::publisher::Publisher`]s. This is + /// required to be called whenever a new [`crate::port::publisher::Publisher`] connected to + /// the service. It is done implicitly whenever [`Subscriber::receive()`] + /// is called. + pub fn update_connections(&self) -> Result<(), ConnectionFailure> { if unsafe { self.dynamic_storage .get() @@ -269,25 +286,3 @@ impl<'a, Service: service::Service, MessageType: Debug> Subscribe Ok(()) } } - -impl<'a, Service: service::Service, MessageType: Debug> SubscribeMgmt - for Subscriber<'a, Service, MessageType> -{ - fn release_sample(&self, channel_id: usize, sample: usize) { - match self.publisher_connections.get(channel_id) { - Some(c) => { - let distance = sample - c.data_segment.allocator_data_start_address(); - match c.receiver.release(PointerOffset::new(distance)) { - Ok(()) => (), - Err(ZeroCopyReleaseError::RetrieveBufferFull) => { - fatal_panic!(from self, when c.receiver.release(PointerOffset::new(distance)), - "This should never happen! The publishers retrieve channel is full and the sample cannot be returned."); - } - } - } - None => { - warn!(from self, "Unable to release sample since the connection is broken. The sample will be discarded and has to be reclaimed manually by the publisher."); - } - } - } -} diff --git a/iceoryx2/src/port/update_connections.rs b/iceoryx2/src/port/update_connections.rs index 6353fc38e..587dea0af 100644 --- a/iceoryx2/src/port/update_connections.rs +++ b/iceoryx2/src/port/update_connections.rs @@ -33,8 +33,8 @@ impl std::error::Error for ConnectionFailure {} pub trait UpdateConnections { /// Explicitly updates all connections to the [`crate::port::subscriber::Subscriber`]s. This is /// required to be called whenever a new [`crate::port::subscriber::Subscriber`] connected to - /// the service. It is done implicitly whenever [`crate::payload_mut::PayloadMut::send()`] or - /// [`crate::port::publish::SendCopy::send_copy()`] is called. + /// the service. It is done implicitly whenever [`crate::sample_mut::SampleMut::send()`] or + /// [`crate::port::publisher::Publisher::send_copy()`] is called. /// When a [`crate::port::subscriber::Subscriber`] is connected that requires a history this /// call will deliver it. /// diff --git a/iceoryx2/src/prelude.rs b/iceoryx2/src/prelude.rs index 208573573..0985f669b 100644 --- a/iceoryx2/src/prelude.rs +++ b/iceoryx2/src/prelude.rs @@ -12,10 +12,5 @@ pub use crate::iox2::Iox2; pub use crate::iox2::Iox2Event; -pub use crate::payload::Payload; -pub use crate::payload_mut::{PayloadMut, UninitPayloadMut}; -pub use crate::port::{ - event_id::EventId, listen::Listen, notify::Notify, publish::DefaultLoan, publish::Publish, - publish::SendCopy, publish::UninitLoan, subscribe::Subscribe, -}; +pub use crate::port::event_id::EventId; pub use crate::service::{process_local, service_name::ServiceName, zero_copy, Service}; diff --git a/iceoryx2/src/sample.rs b/iceoryx2/src/sample.rs index ac9f21d72..fc9e573fc 100644 --- a/iceoryx2/src/sample.rs +++ b/iceoryx2/src/sample.rs @@ -33,42 +33,63 @@ //! //! See also [`crate::sample::Sample`]. +use std::rc::Rc; use std::{fmt::Debug, ops::Deref}; -use crate::port::subscribe::internal::SubscribeMgmt; +use iceoryx2_bb_log::{fatal_panic, warn}; +use iceoryx2_cal::shared_memory::SharedMemory; +use iceoryx2_cal::zero_copy_connection::{PointerOffset, ZeroCopyReceiver, ZeroCopyReleaseError}; + +use crate::port::details::publisher_connections::PublisherConnections; +use crate::raw_sample::RawSample; use crate::service::header::publish_subscribe::Header; -use crate::{payload::Payload, raw_sample::RawSample}; /// It stores the payload and is acquired by the [`crate::port::subscriber::Subscriber`] whenever /// it receives new data from a [`crate::port::publisher::Publisher`] via -/// [`crate::port::subscribe::Subscribe::receive()`]. +/// [`crate::port::subscriber::Subscriber::receive()`]. #[derive(Debug)] -pub struct Sample<'subscriber, MessageType: Debug> { - pub(crate) subscriber: &'subscriber dyn SubscribeMgmt, +pub struct Sample { + pub(crate) publisher_connections: Rc>, pub(crate) ptr: RawSample, pub(crate) channel_id: usize, } -impl Deref for Sample<'_, MessageType> { +impl Deref for Sample { type Target = MessageType; fn deref(&self) -> &Self::Target { self.ptr.as_data_ref() } } -impl Drop for Sample<'_, MessageType> { +impl Drop for Sample { fn drop(&mut self) { - self.subscriber - .release_sample(self.channel_id, self.ptr.as_ptr() as usize); + match self.publisher_connections.get(self.channel_id) { + Some(c) => { + let distance = + self.ptr.as_ptr() as usize - c.data_segment.allocator_data_start_address(); + match c.receiver.release(PointerOffset::new(distance)) { + Ok(()) => (), + Err(ZeroCopyReleaseError::RetrieveBufferFull) => { + fatal_panic!(from self, when c.receiver.release(PointerOffset::new(distance)), + "This should never happen! The publishers retrieve channel is full and the sample cannot be returned."); + } + } + } + None => { + warn!(from self, "Unable to release sample since the connection is broken. The sample will be discarded and has to be reclaimed manually by the publisher."); + } + } } } -impl<'subscriber, MessageType: Debug> Payload for Sample<'subscriber, MessageType> { - fn payload(&self) -> &MessageType { +impl Sample { + /// Returns a reference to the payload of the sample + pub fn payload(&self) -> &MessageType { self.ptr.as_data_ref() } - fn header(&self) -> &Header { + /// Returns a reference to the header of the sample. + pub fn header(&self) -> &Header { self.ptr.as_header_ref() } } diff --git a/iceoryx2/src/sample_mut.rs b/iceoryx2/src/sample_mut.rs index 8bf415632..d72b81361 100644 --- a/iceoryx2/src/sample_mut.rs +++ b/iceoryx2/src/sample_mut.rs @@ -37,17 +37,16 @@ //! See also, [`crate::sample_mut::SampleMut`]. use crate::{ - payload_mut::{internal::PayloadMgmt, PayloadMut, UninitPayloadMut}, - port::{publish::internal::PublishMgmt, update_connections::ConnectionFailure}, + port::publisher::{DataSegment, PublisherSendError}, raw_sample::RawSampleMut, service::header::publish_subscribe::Header, }; use iceoryx2_cal::shared_memory::*; -use std::{fmt::Debug, mem::MaybeUninit}; +use std::{fmt::Debug, mem::MaybeUninit, rc::Rc}; /// Acquired by a [`crate::port::publisher::Publisher`] via -/// [`crate::port::publish::DefaultLoan::loan()`] or -/// [`crate::port::publish::UninitLoan::loan_uninit()`]. It stores the payload that will be sent +/// [`crate::port::publisher::Publisher::loan()`] or +/// [`crate::port::publisher::Publisher::loan_uninit()`]. It stores the payload that will be sent /// to all connected [`crate::port::subscriber::Subscriber`]s. If the [`SampleMut`] is not sent /// it will release the loaned memory when going out of scope. /// @@ -59,76 +58,204 @@ use std::{fmt::Debug, mem::MaybeUninit}; /// The generic parameter `M` is either a `MessageType` or a [`core::mem::MaybeUninit`], depending /// which API is used to obtain the sample. #[derive(Debug)] -pub struct SampleMut<'publisher, M: Debug> { - pub(crate) publisher: &'publisher dyn PublishMgmt, - ptr: RawSampleMut, - offset_to_chunk: PointerOffset, +pub struct SampleMut { + data_segment: Rc>, + ptr: RawSampleMut, + pub(crate) offset_to_chunk: PointerOffset, } -impl Drop for SampleMut<'_, M> { +impl Drop + for SampleMut +{ fn drop(&mut self) { - self.publisher.return_loaned_sample(self.offset_to_chunk); + self.data_segment.return_loaned_sample(self.offset_to_chunk); } } -impl<'publisher, MessageType: Debug> SampleMut<'publisher, MaybeUninit> { +impl + SampleMut, Service> +{ pub(crate) fn new( - publisher: &'publisher dyn PublishMgmt, + data_segment: &Rc>, ptr: RawSampleMut>, offset_to_chunk: PointerOffset, ) -> Self { - // SAFETY: the transmute is not nice but safe since MaybeUninit is #[repr(transparent)} to the inner type - let publisher = unsafe { std::mem::transmute(publisher) }; - Self { - publisher, + data_segment: Rc::clone(data_segment), ptr, offset_to_chunk, } } } -impl<'publisher, MessageType: Debug> PayloadMgmt for SampleMut<'publisher, MessageType> { - fn offset_to_chunk(&self) -> PointerOffset { - self.offset_to_chunk - } -} - -impl<'publisher, MessageType: Debug> UninitPayloadMut - for SampleMut<'publisher, MaybeUninit> +impl + SampleMut, Service> { - type InitializedSample = SampleMut<'publisher, MessageType>; - - fn write_payload(mut self, value: MessageType) -> SampleMut<'publisher, MessageType> { + /// Writes the payload to the sample and labels the sample as initialized + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// # fn main() -> Result<(), Box> { + /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); + /// # + /// # let service = zero_copy::Service::new(&service_name) + /// # .publish_subscribe() + /// # .open_or_create::()?; + /// # + /// # let publisher = service.publisher().create()?; + /// + /// let sample = publisher.loan_uninit()?; + /// let sample = sample.write_payload(1234); + /// + /// sample.send()?; + /// + /// # Ok(()) + /// # } + /// ``` + pub fn write_payload(mut self, value: MessageType) -> SampleMut { self.payload_mut().write(value); // SAFETY: this is safe since the payload was initialized on the line above unsafe { self.assume_init() } } - unsafe fn assume_init(self) -> SampleMut<'publisher, MessageType> { + /// Extracts the value of the [`core::mem::MaybeUninit`] container and labels the sample as initialized + /// + /// # Safety + /// + /// The caller must ensure that [`core::mem::MaybeUninit`] really is initialized. Calling this when + /// the content is not fully initialized causes immediate undefined behavior. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// # fn main() -> Result<(), Box> { + /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); + /// # + /// # let service = zero_copy::Service::new(&service_name) + /// # .publish_subscribe() + /// # .open_or_create::()?; + /// # + /// # let publisher = service.publisher().create()?; + /// + /// let mut sample = publisher.loan_uninit()?; + /// sample.payload_mut().write(1234); + /// let sample = unsafe { sample.assume_init() }; + /// + /// sample.send()?; + /// + /// # Ok(()) + /// # } + /// ``` + pub unsafe fn assume_init(self) -> SampleMut { // the transmute is not nice but safe since MaybeUninit is #[repr(transparent)] to the inner type std::mem::transmute(self) } } impl< - 'publisher, M: Debug, // `M` is either a `MessageType` or a `MaybeUninit` - > PayloadMut for SampleMut<'publisher, M> + Service: crate::service::Service, + > SampleMut { - fn header(&self) -> &Header { + /// Returns a reference to the header of the sample. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// + /// # fn main() -> Result<(), Box> { + /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); + /// # + /// # let service = zero_copy::Service::new(&service_name) + /// # .publish_subscribe() + /// # .open_or_create::()?; + /// # let publisher = service.publisher().create()?; + /// + /// let sample = publisher.loan()?; + /// println!("Sample Publisher Origin {:?}", sample.header().publisher_id()); + /// + /// # Ok(()) + /// # } + /// ``` + pub fn header(&self) -> &Header { self.ptr.as_header_ref() } - fn payload(&self) -> &M { + /// Returns a reference to the payload of the sample. + /// + /// # Notes + /// + /// The generic parameter `MessageType` can be packed into [`core::mem::MaybeUninit`], depending + /// which API is used to obtain the sample. Obtaining a reference is safe for either type. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// + /// # fn main() -> Result<(), Box> { + /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); + /// # + /// # let service = zero_copy::Service::new(&service_name) + /// # .publish_subscribe() + /// # .open_or_create::()?; + /// # let publisher = service.publisher().create()?; + /// + /// let sample = publisher.loan()?; + /// println!("Sample current payload {}", sample.payload()); + /// + /// # Ok(()) + /// # } + /// ``` + pub fn payload(&self) -> &M { self.ptr.as_data_ref() } - fn payload_mut(&mut self) -> &mut M { + /// Returns a mutable reference to the payload of the sample. + /// + /// # Notes + /// + /// The generic parameter `MessageType` can be packed into [`core::mem::MaybeUninit`], depending + /// which API is used to obtain the sample. Obtaining a reference is safe for either type. + /// + /// # Example + /// + /// ``` + /// use iceoryx2::prelude::*; + /// + /// # fn main() -> Result<(), Box> { + /// # let service_name = ServiceName::new("My/Funk/ServiceName").unwrap(); + /// # + /// # let service = zero_copy::Service::new(&service_name) + /// # .publish_subscribe() + /// # .open_or_create::()?; + /// # let publisher = service.publisher().create()?; + /// + /// let mut sample = publisher.loan()?; + /// *sample.payload_mut() = 4567; + /// + /// # Ok(()) + /// # } + /// ``` + pub fn payload_mut(&mut self) -> &mut M { self.ptr.as_data_mut() } - fn send(self) -> Result { - self.publisher.send_impl(self.offset_to_chunk.value()) + /// Send a previously loaned [`crate::port::publisher::Publisher::loan_uninit()`] or + /// [`crate::port::publisher::Publisher::loan()`] [`SampleMut`] to all connected + /// [`crate::port::subscriber::Subscriber`]s of the service. + /// + /// The payload of the [`SampleMut`] must be initialized before it can be sent. Have a look + /// at [`SampleMut::write_payload()`] and [`SampleMut::assume_init()`] + /// for more details. + /// + /// On success the number of [`crate::port::subscriber::Subscriber`]s that received + /// the data is returned, otherwise a [`PublisherSendError`] describing the failure. + pub fn send(self) -> Result { + self.data_segment.send_sample(self.offset_to_chunk.value()) } } diff --git a/iceoryx2/src/service/port_factory/listener.rs b/iceoryx2/src/service/port_factory/listener.rs index c55ae1c0b..9a0344b58 100644 --- a/iceoryx2/src/service/port_factory/listener.rs +++ b/iceoryx2/src/service/port_factory/listener.rs @@ -29,7 +29,7 @@ use std::fmt::Debug; use iceoryx2_bb_log::fail; -use crate::port::{listen::ListenerCreateError, listener::Listener}; +use crate::port::{listener::Listener, listener::ListenerCreateError}; use crate::service; use super::event::PortFactory; diff --git a/iceoryx2/src/service/port_factory/notifier.rs b/iceoryx2/src/service/port_factory/notifier.rs index cdce0764f..d3c3ead98 100644 --- a/iceoryx2/src/service/port_factory/notifier.rs +++ b/iceoryx2/src/service/port_factory/notifier.rs @@ -30,7 +30,7 @@ //! ``` use std::fmt::Debug; -use crate::port::{event_id::EventId, notifier::Notifier, notify::NotifierCreateError}; +use crate::port::{event_id::EventId, notifier::Notifier, notifier::NotifierCreateError}; use iceoryx2_bb_log::fail; use crate::service; @@ -55,7 +55,7 @@ impl<'factory, Service: service::Service> PortFactoryNotifier<'factory, Service> } /// Sets a default [`EventId`] for the [`Notifier`] that is used in - /// [`crate::port::notify::Notify::notify()`] + /// [`Notifier::notify()`] pub fn default_event_id(mut self, value: EventId) -> Self { self.default_event_id = value; self diff --git a/iceoryx2/src/service/port_factory/publish_subscribe.rs b/iceoryx2/src/service/port_factory/publish_subscribe.rs index 4f8114a7a..7c693eef2 100644 --- a/iceoryx2/src/service/port_factory/publish_subscribe.rs +++ b/iceoryx2/src/service/port_factory/publish_subscribe.rs @@ -123,7 +123,7 @@ impl PortFactory PortFactorySubscriber { - PortFactorySubscriber { factory: self } + PortFactorySubscriber::new(self) } /// Returns a [`PortFactoryPublisher`] to create a new diff --git a/iceoryx2/src/service/port_factory/publisher.rs b/iceoryx2/src/service/port_factory/publisher.rs index eabbdea1e..32494ef2e 100644 --- a/iceoryx2/src/service/port_factory/publisher.rs +++ b/iceoryx2/src/service/port_factory/publisher.rs @@ -36,11 +36,19 @@ use iceoryx2_bb_log::fail; use serde::{de::Visitor, Deserialize, Serialize}; use super::publish_subscribe::PortFactory; -use crate::{port::publish::PublisherCreateError, port::publisher::Publisher, service}; +use crate::{ + port::{ + port_identifiers::{UniquePublisherId, UniqueSubscriberId}, + publisher::Publisher, + publisher::PublisherCreateError, + DegrationAction, DegrationCallback, + }, + service, +}; /// Defines the strategy the [`Publisher`] shall pursue in -/// [`crate::payload_mut::PayloadMut::send()`] or -/// [`crate::port::publish::SendCopy::send_copy()`] when the buffer of a +/// [`crate::sample_mut::SampleMut::send()`] or +/// [`Publisher::send_copy()`] when the buffer of a /// [`crate::port::subscriber::Subscriber`] is full and the service does not overflow. #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub enum UnableToDeliverStrategy { @@ -93,10 +101,11 @@ impl<'de> Deserialize<'de> for UnableToDeliverStrategy { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug)] pub(crate) struct LocalPublisherConfig { pub(crate) max_loaned_samples: usize, pub(crate) unable_to_deliver_strategy: UnableToDeliverStrategy, + pub(crate) degration_callback: Option>, } /// Factory to create a new [`Publisher`] port/endpoint for @@ -114,6 +123,7 @@ impl<'factory, Service: service::Service, MessageType: Debug> pub(crate) fn new(factory: &'factory PortFactory) -> Self { Self { config: LocalPublisherConfig { + degration_callback: None, max_loaned_samples: factory .service .state() @@ -134,8 +144,8 @@ impl<'factory, Service: service::Service, MessageType: Debug> } /// Defines how many [`crate::sample_mut::SampleMut`] the [`Publisher`] can loan with - /// [`crate::port::publish::DefaultLoan::loan()`] or - /// [`crate::port::publish::UninitLoan::loan_uninit()`] in parallel. + /// [`Publisher::loan()`] or + /// [`Publisher::loan_uninit()`] in parallel. pub fn max_loaned_samples(mut self, value: usize) -> Self { self.config.max_loaned_samples = value; self @@ -147,10 +157,33 @@ impl<'factory, Service: service::Service, MessageType: Debug> self } + /// Sets the [`DegrationCallback`] of the [`Publisher`]. Whenever a connection to a + /// [`crate::port::subscriber::Subscriber`] is corrupted or it seems to be dead, this callback + /// is called and depending on the returned [`DegrationAction`] measures will be taken. + pub fn set_degration_callback< + F: Fn( + service::static_config::StaticConfig, + UniquePublisherId, + UniqueSubscriberId, + ) -> DegrationAction + + 'static, + >( + mut self, + callback: Option, + ) -> Self { + match callback { + Some(c) => self.config.degration_callback = Some(DegrationCallback::new(c)), + None => self.config.degration_callback = None, + } + + self + } + /// Creates a new [`Publisher`] or returns a [`PublisherCreateError`] on failure. - pub fn create(self) -> Result, PublisherCreateError> { + pub fn create(self) -> Result, PublisherCreateError> { + let origin = format!("{:?}", self); Ok( - fail!(from self, when Publisher::new(&self.factory.service, self.factory.service.state().static_config.publish_subscribe(), &self.config), + fail!(from origin, when Publisher::new(&self.factory.service, self.factory.service.state().static_config.publish_subscribe(), self.config), "Failed to create new Publisher port."), ) } diff --git a/iceoryx2/src/service/port_factory/subscriber.rs b/iceoryx2/src/service/port_factory/subscriber.rs index 16d3d9bc5..1e804d439 100644 --- a/iceoryx2/src/service/port_factory/subscriber.rs +++ b/iceoryx2/src/service/port_factory/subscriber.rs @@ -33,27 +33,70 @@ use std::fmt::Debug; use iceoryx2_bb_log::fail; -use crate::{port::subscribe::SubscriberCreateError, port::subscriber::Subscriber, service}; +use crate::{ + port::{ + port_identifiers::{UniquePublisherId, UniqueSubscriberId}, + subscriber::{Subscriber, SubscriberCreateError}, + DegrationAction, DegrationCallback, + }, + service, +}; use super::publish_subscribe::PortFactory; +#[derive(Debug)] +pub(crate) struct SubscriberConfig { + pub(crate) degration_callback: Option>, +} + /// Factory to create a new [`Subscriber`] port/endpoint for /// [`MessagingPattern::PublishSubscribe`](crate::service::messaging_pattern::MessagingPattern::PublishSubscribe) based /// communication. #[derive(Debug)] pub struct PortFactorySubscriber<'factory, Service: service::Service, MessageType: Debug> { + config: SubscriberConfig, pub(crate) factory: &'factory PortFactory, } impl<'factory, Service: service::Service, MessageType: Debug> PortFactorySubscriber<'factory, Service, MessageType> { + pub(crate) fn new(factory: &'factory PortFactory) -> Self { + Self { + config: SubscriberConfig { + degration_callback: None, + }, + factory, + } + } + + /// Sets the [`DegrationCallback`] of the [`Subscriber`]. Whenever a connection to a + /// [`crate::port::subscriber::Subscriber`] is corrupted or it seems to be dead, this callback + /// is called and depending on the returned [`DegrationAction`] measures will be taken. + pub fn set_degration_callback< + F: Fn( + service::static_config::StaticConfig, + UniquePublisherId, + UniqueSubscriberId, + ) -> DegrationAction + + 'static, + >( + mut self, + callback: Option, + ) -> Self { + match callback { + Some(c) => self.config.degration_callback = Some(DegrationCallback::new(c)), + None => self.config.degration_callback = None, + } + + self + } + /// Creates a new [`Subscriber`] or returns a [`SubscriberCreateError`] on failure. - pub fn create( - &self, - ) -> Result, SubscriberCreateError> { + pub fn create(self) -> Result, SubscriberCreateError> { + let origin = format!("{:?}", self); Ok( - fail!(from self, when Subscriber::new(&self.factory.service, self.factory.service.state().static_config.publish_subscribe()), + fail!(from origin, when Subscriber::new(&self.factory.service, self.factory.service.state().static_config.publish_subscribe(), self.config), "Failed to create new Subscriber port."), ) } diff --git a/iceoryx2/tests/publisher_tests.rs b/iceoryx2/tests/publisher_tests.rs index 1836cc27e..2e095515b 100644 --- a/iceoryx2/tests/publisher_tests.rs +++ b/iceoryx2/tests/publisher_tests.rs @@ -14,9 +14,7 @@ mod publisher { use std::time::{Duration, Instant}; - use iceoryx2::payload_mut::UninitPayloadMut; - use iceoryx2::port::publish::PublisherLoanError; - use iceoryx2::prelude::*; + use iceoryx2::port::publisher::PublisherLoanError; use iceoryx2::service::port_factory::publisher::UnableToDeliverStrategy; use iceoryx2::service::{service_name::ServiceName, Service}; use iceoryx2_bb_posix::barrier::{BarrierBuilder, BarrierHandle}; diff --git a/iceoryx2/tests/service_publish_subscribe_tests.rs b/iceoryx2/tests/service_publish_subscribe_tests.rs index 8697b22ef..8c51d87e8 100644 --- a/iceoryx2/tests/service_publish_subscribe_tests.rs +++ b/iceoryx2/tests/service_publish_subscribe_tests.rs @@ -13,8 +13,8 @@ #[generic_tests::define] mod service_publish_subscribe { use iceoryx2::config::Config; - use iceoryx2::port::publish::{PublisherCreateError, PublisherLoanError}; - use iceoryx2::port::subscribe::SubscriberCreateError; + use iceoryx2::port::publisher::{PublisherCreateError, PublisherLoanError}; + use iceoryx2::port::subscriber::SubscriberCreateError; use iceoryx2::port::update_connections::UpdateConnections; use iceoryx2::prelude::*; use iceoryx2::service::builder::publish_subscribe::PublishSubscribeCreateError;