Skip to content

Commit

Permalink
[#100] Degration action is set via port builder
Browse files Browse the repository at this point in the history
  • Loading branch information
elfenpiff committed Feb 11, 2024
1 parent c377c03 commit b26131a
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 57 deletions.
31 changes: 5 additions & 26 deletions iceoryx2/src/port/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use crate::message::Message;
use crate::payload_mut::{internal::PayloadMgmt, PayloadMut, UninitPayloadMut};
use crate::port::details::subscriber_connections::*;
use crate::port::update_connections::{ConnectionFailure, UpdateConnections};
use crate::port::{DegrationAction, DegrationCallback};
use crate::port::DegrationAction;
use crate::raw_sample::RawSampleMut;
use crate::service;
use crate::service::config_scheme::data_segment_config;
Expand Down Expand Up @@ -109,7 +109,6 @@ pub(crate) struct DataSegment<Service: service::Service> {
subscriber_list_state: UnsafeCell<ContainerState<UniqueSubscriberId>>,
history: Option<UnsafeCell<Queue<usize>>>,
static_config: crate::service::static_config::StaticConfig,
degration_callback: Option<DegrationCallback<'static>>,
loan_counter: AtomicUsize,
}

Expand Down Expand Up @@ -254,7 +253,7 @@ impl<Service: service::Service> DataSegment<Service> {
fatal_panic!(from self, "This should never happen! Unable to acquire previously created subscriber connection.")
}
},
Err(e) => match &self.degration_callback {
Err(e) => match &self.config.degration_callback {
Some(c) => match c.call(
self.static_config.clone(),
self.port_id,
Expand Down Expand Up @@ -347,7 +346,7 @@ impl<Service: service::Service, MessageType: Debug> Publisher<Service, MessageTy
pub(crate) fn new(
service: &Service,
static_config: &publish_subscribe::StaticConfig,
config: &LocalPublisherConfig,
config: LocalPublisherConfig,
) -> Result<Self, PublisherCreateError> {
let msg = "Unable to create Publisher port";
let origin = "Publisher::new()";
Expand Down Expand Up @@ -407,14 +406,13 @@ impl<Service: service::Service, MessageType: Debug> Publisher<Service, MessageTy
port_id,
static_config,
),
config: *config,
config,
subscriber_list_state: unsafe { UnsafeCell::new(subscriber_list.get_state()) },
history: match static_config.history_size == 0 {
true => None,
false => Some(UnsafeCell::new(Queue::new(static_config.history_size))),
},
static_config: service.state().static_config.clone(),
degration_callback: None,
loan_counter: AtomicUsize::new(0),
},
dynamic_publisher_handle,
Expand Down Expand Up @@ -448,27 +446,8 @@ impl<Service: service::Service, MessageType: Debug> Publisher<Service, MessageTy
.create(&allocator_config),
"Unable to create the data segment."))
}

/// Sets the [`DegrationCallback`] of the [`Publisher`]. Whenever a connection to a
/// [`crate::port::subscriber::Subscriber`] is corrupted or a seems to be dead, this callback
/// is called and depending on the returned [`DegrationAction`] measures will be taken.
pub fn set_degration_callback<
F: Fn(
service::static_config::StaticConfig,
UniquePublisherId,
UniqueSubscriberId,
) -> DegrationAction
+ 'static,
>(
&mut self,
callback: Option<F>,
) {
match callback {
Some(c) => self.data_segment.degration_callback = Some(DegrationCallback::new(c)),
None => self.data_segment.degration_callback = None,
}
}
}

impl<Service: service::Service, MessageType: Debug + Default> Publish<MessageType>
for Publisher<Service, MessageType>
{
Expand Down
29 changes: 5 additions & 24 deletions iceoryx2/src/port/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use iceoryx2_cal::dynamic_storage::DynamicStorage;
use iceoryx2_cal::{shared_memory::*, zero_copy_connection::*};

use crate::port::DegrationAction;
use crate::service::port_factory::subscriber::SubscriberConfig;
use crate::service::static_config::publish_subscribe::StaticConfig;
use crate::{
message::Message, raw_sample::RawSample, sample::Sample, service,
Expand All @@ -51,7 +52,6 @@ use crate::{
use super::details::publisher_connections::{Connection, PublisherConnections};
use super::port_identifiers::{UniquePublisherId, UniqueSubscriberId};
use super::update_connections::ConnectionFailure;
use super::DegrationCallback;

/// Defines the failure that can occur when receiving data with [`Subscriber::receive()`].
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
Expand Down Expand Up @@ -98,7 +98,7 @@ pub struct Subscriber<Service: service::Service, MessageType: Debug> {
publisher_connections: Rc<PublisherConnections<Service>>,
dynamic_storage: Rc<Service::DynamicStorage>,
static_config: crate::service::static_config::StaticConfig,
degration_callback: Option<DegrationCallback<'static>>,
config: SubscriberConfig,

publisher_list_state: UnsafeCell<ContainerState<UniquePublisherId>>,
_phantom_message_type: PhantomData<MessageType>,
Expand All @@ -117,6 +117,7 @@ impl<Service: service::Service, MessageType: Debug> Subscriber<Service, MessageT
pub(crate) fn new(
service: &Service,
static_config: &StaticConfig,
config: SubscriberConfig,
) -> Result<Self, SubscriberCreateError> {
let msg = "Failed to create Subscriber port";
let origin = "Subscriber::new()";
Expand Down Expand Up @@ -148,6 +149,7 @@ impl<Service: service::Service, MessageType: Debug> Subscriber<Service, MessageT
};

let new_self = Self {
config,
publisher_connections: Rc::new(PublisherConnections::new(
publisher_list.capacity(),
port_id,
Expand All @@ -158,7 +160,6 @@ impl<Service: service::Service, MessageType: Debug> Subscriber<Service, MessageT
publisher_list_state: UnsafeCell::new(unsafe { publisher_list.get_state() }),
dynamic_subscriber_handle,
static_config: service.state().static_config.clone(),
degration_callback: None,
_phantom_message_type: PhantomData,
};

Expand All @@ -184,7 +185,7 @@ impl<Service: service::Service, MessageType: Debug> Subscriber<Service, MessageT
match index {
Some(publisher_id) => match self.publisher_connections.create(i, *publisher_id) {
Ok(()) => (),
Err(e) => match &self.degration_callback {
Err(e) => match &self.config.degration_callback {
None => {
warn!(from self, "Unable to establish connection to new publisher {:?}.", publisher_id)
}
Expand Down Expand Up @@ -243,26 +244,6 @@ impl<Service: service::Service, MessageType: Debug> Subscriber<Service, MessageT
}
}

/// Sets the [`DegrationCallback`] of the [`Subscriber`]. Whenever a connection to a
/// [`crate::port::publisher::Publisher`] is corrupted or a seems to be dead, this callback
/// is called and depending on the returned [`DegrationAction`] measures will be taken.
pub fn set_degration_callback<
F: Fn(
service::static_config::StaticConfig,
UniquePublisherId,
UniqueSubscriberId,
) -> DegrationAction
+ 'static,
>(
&mut self,
callback: Option<F>,
) {
match callback {
Some(c) => self.degration_callback = Some(DegrationCallback::new(c)),
None => self.degration_callback = None,
}
}

/// Receives a [`crate::sample::Sample`] from [`crate::port::publisher::Publisher`]. If no sample could be
/// received [`None`] is returned. If a failure occurs [`SubscriberReceiveError`] is returned.
pub fn receive(&self) -> Result<Option<Sample<MessageType, Service>>, SubscriberReceiveError> {
Expand Down
2 changes: 1 addition & 1 deletion iceoryx2/src/service/port_factory/publish_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl<Service: service::Service, MessageType: Debug> PortFactory<Service, Message
/// # }
/// ```
pub fn subscriber(&self) -> PortFactorySubscriber<Service, MessageType> {
PortFactorySubscriber { factory: self }
PortFactorySubscriber::new(self)
}

/// Returns a [`PortFactoryPublisher`] to create a new
Expand Down
39 changes: 36 additions & 3 deletions iceoryx2/src/service/port_factory/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,15 @@ use iceoryx2_bb_log::fail;
use serde::{de::Visitor, Deserialize, Serialize};

use super::publish_subscribe::PortFactory;
use crate::{port::publish::PublisherCreateError, port::publisher::Publisher, service};
use crate::{
port::{
port_identifiers::{UniquePublisherId, UniqueSubscriberId},
publish::PublisherCreateError,
publisher::Publisher,
DegrationAction, DegrationCallback,
},
service,
};

/// Defines the strategy the [`Publisher`] shall pursue in
/// [`crate::payload_mut::PayloadMut::send()`] or
Expand Down Expand Up @@ -93,10 +101,11 @@ impl<'de> Deserialize<'de> for UnableToDeliverStrategy {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug)]
pub(crate) struct LocalPublisherConfig {
pub(crate) max_loaned_samples: usize,
pub(crate) unable_to_deliver_strategy: UnableToDeliverStrategy,
pub(crate) degration_callback: Option<DegrationCallback<'static>>,
}

/// Factory to create a new [`Publisher`] port/endpoint for
Expand All @@ -114,6 +123,7 @@ impl<'factory, Service: service::Service, MessageType: Debug>
pub(crate) fn new(factory: &'factory PortFactory<Service, MessageType>) -> Self {
Self {
config: LocalPublisherConfig {
degration_callback: None,
max_loaned_samples: factory
.service
.state()
Expand Down Expand Up @@ -147,10 +157,33 @@ impl<'factory, Service: service::Service, MessageType: Debug>
self
}

/// Sets the [`DegrationCallback`] of the [`Publisher`]. Whenever a connection to a
/// [`crate::port::subscriber::Subscriber`] is corrupted or it seems to be dead, this callback
/// is called and depending on the returned [`DegrationAction`] measures will be taken.
pub fn set_degration_callback<
F: Fn(
service::static_config::StaticConfig,
UniquePublisherId,
UniqueSubscriberId,
) -> DegrationAction
+ 'static,
>(
mut self,
callback: Option<F>,
) -> Self {
match callback {
Some(c) => self.config.degration_callback = Some(DegrationCallback::new(c)),
None => self.config.degration_callback = None,
}

self
}

/// Creates a new [`Publisher`] or returns a [`PublisherCreateError`] on failure.
pub fn create(self) -> Result<Publisher<Service, MessageType>, PublisherCreateError> {
let origin = format!("{:?}", self);
Ok(
fail!(from self, when Publisher::new(&self.factory.service, self.factory.service.state().static_config.publish_subscribe(), &self.config),
fail!(from origin, when Publisher::new(&self.factory.service, self.factory.service.state().static_config.publish_subscribe(), self.config),
"Failed to create new Publisher port."),
)
}
Expand Down
48 changes: 45 additions & 3 deletions iceoryx2/src/service/port_factory/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,69 @@ use std::fmt::Debug;
use iceoryx2_bb_log::fail;

use crate::{
port::subscriber::{Subscriber, SubscriberCreateError},
port::{
port_identifiers::{UniquePublisherId, UniqueSubscriberId},
subscriber::{Subscriber, SubscriberCreateError},
DegrationAction, DegrationCallback,
},
service,
};

use super::publish_subscribe::PortFactory;

#[derive(Debug)]
pub(crate) struct SubscriberConfig {
pub(crate) degration_callback: Option<DegrationCallback<'static>>,
}

/// Factory to create a new [`Subscriber`] port/endpoint for
/// [`MessagingPattern::PublishSubscribe`](crate::service::messaging_pattern::MessagingPattern::PublishSubscribe) based
/// communication.
#[derive(Debug)]
pub struct PortFactorySubscriber<'factory, Service: service::Service, MessageType: Debug> {
config: SubscriberConfig,
pub(crate) factory: &'factory PortFactory<Service, MessageType>,
}

impl<'factory, Service: service::Service, MessageType: Debug>
PortFactorySubscriber<'factory, Service, MessageType>
{
pub(crate) fn new(factory: &'factory PortFactory<Service, MessageType>) -> Self {
Self {
config: SubscriberConfig {
degration_callback: None,
},
factory,
}
}

/// Sets the [`DegrationCallback`] of the [`Subscriber`]. Whenever a connection to a
/// [`crate::port::subscriber::Subscriber`] is corrupted or it seems to be dead, this callback
/// is called and depending on the returned [`DegrationAction`] measures will be taken.
pub fn set_degration_callback<
F: Fn(
service::static_config::StaticConfig,
UniquePublisherId,
UniqueSubscriberId,
) -> DegrationAction
+ 'static,
>(
mut self,
callback: Option<F>,
) -> Self {
match callback {
Some(c) => self.config.degration_callback = Some(DegrationCallback::new(c)),
None => self.config.degration_callback = None,
}

self
}

/// Creates a new [`Subscriber`] or returns a [`SubscriberCreateError`] on failure.
pub fn create(&self) -> Result<Subscriber<Service, MessageType>, SubscriberCreateError> {
pub fn create(self) -> Result<Subscriber<Service, MessageType>, SubscriberCreateError> {
let origin = format!("{:?}", self);
Ok(
fail!(from self, when Subscriber::new(&self.factory.service, self.factory.service.state().static_config.publish_subscribe()),
fail!(from origin, when Subscriber::new(&self.factory.service, self.factory.service.state().static_config.publish_subscribe(), self.config),
"Failed to create new Subscriber port."),
)
}
Expand Down

0 comments on commit b26131a

Please sign in to comment.