Skip to content

Commit

Permalink
[#116] Add missing mem sync in shm zero copy connection, remove retri…
Browse files Browse the repository at this point in the history
…eve buffer check from publisher - the publisher has not enough information to perform this check
  • Loading branch information
elfenpiff committed Feb 19, 2024
1 parent f4690bf commit a444ddd
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 62 deletions.
1 change: 0 additions & 1 deletion iceoryx2-cal/src/zero_copy_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ impl std::error::Error for ZeroCopyCreationError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ZeroCopySendError {
ReceiveBufferFull,
ClearRetrieveChannelBeforeSend,
}

impl std::fmt::Display for ZeroCopySendError {
Expand Down
20 changes: 9 additions & 11 deletions iceoryx2-cal/src/zero_copy_connection/posix_shared_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,13 @@ impl Builder {
fatal_panic!(from self, when unsafe { (*mgmt_ptr).retrieve_channel.init(&allocator) },
"{} since the retrieve channel allocation failed. - This is an implementation bug!", msg);

//////////////////////////////////////////
// SYNC POINT: write SharedManagementData
//////////////////////////////////////////
unsafe {
(*mgmt_ptr)
.init_state
.store(IS_INITIALIZED_STATE_VALUE, Ordering::Relaxed)
.store(IS_INITIALIZED_STATE_VALUE, Ordering::Release)
};
shm.release_ownership();
}
Expand All @@ -251,7 +254,11 @@ impl Builder {
with ZeroCopyCreationError::InternalError, "{} since the adaptive wait could not be created.", msg);

let mgmt_ref = unsafe { &mut *mgmt_ptr };
while mgmt_ref.init_state.load(Ordering::Relaxed) != IS_INITIALIZED_STATE_VALUE {

//////////////////////////////////////////
// SYNC POINT: read SharedManagementData
//////////////////////////////////////////
while mgmt_ref.init_state.load(Ordering::Acquire) != IS_INITIALIZED_STATE_VALUE {
if fail!(from self, when adaptive_wait.wait(), with ZeroCopyCreationError::InternalError,
"{} since a failure while waiting for creation finalization occurred.", msg)
< MAX_CREATION_DURATION
Expand Down Expand Up @@ -426,15 +433,6 @@ impl ZeroCopyPortDetails for Sender {
impl ZeroCopySender for Sender {
fn try_send(&self, ptr: PointerOffset) -> Result<Option<PointerOffset>, ZeroCopySendError> {
let msg = "Unable to send sample";
let space_in_retrieve_channel =
self.mgmt().retrieve_channel.capacity() - self.mgmt().retrieve_channel.len();

if space_in_retrieve_channel
<= self.mgmt().max_borrowed_samples + self.mgmt().receive_channel.len()
{
fail!(from self, with ZeroCopySendError::ClearRetrieveChannelBeforeSend,
"{} since sufficient space for every sample in the retrieve channel cannot be guaranteed. Samples have to be retrieved before a new sample can be send.", msg);
}

if !self.mgmt().enable_safe_overflow && self.mgmt().receive_channel.is_full() {
fail!(from self, with ZeroCopySendError::ReceiveBufferFull,
Expand Down
9 changes: 0 additions & 9 deletions iceoryx2-cal/src/zero_copy_connection/process_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,6 @@ impl ZeroCopyPortDetails for Sender {
impl ZeroCopySender for Sender {
fn try_send(&self, ptr: PointerOffset) -> Result<Option<PointerOffset>, ZeroCopySendError> {
let msg = "Unable to send sample";
let space_in_retrieve_channel =
self.mgmt.retrieve_channel.capacity() - self.mgmt.retrieve_channel.len();

if space_in_retrieve_channel
<= self.mgmt.max_borrowed_samples + self.mgmt.receive_channel.len()
{
fail!(from self, with ZeroCopySendError::ClearRetrieveChannelBeforeSend,
"{} since sufficient space for every sample in the retrieve channel cannot be guaranteed. Samples have to be retrieved before a new sample can be send.", msg);
}

if !self.mgmt.enable_safe_overflow && self.mgmt.receive_channel.is_full() {
fail!(from self, with ZeroCopySendError::ReceiveBufferFull,
Expand Down
37 changes: 0 additions & 37 deletions iceoryx2-cal/tests/zero_copy_connection_trait_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,43 +287,6 @@ mod zero_copy_connection {
assert_that!(sample, is_none);
}

#[test]
fn retrieve_channel_must_always_have_enough_space_left<Sut: ZeroCopyConnection>() {
let name = generate_name();
const BUFFER_SIZE: usize = 34;
const MAX_BORROWED_SAMPLES: usize = 34;

let sut_sender = Sut::Builder::new(&name)
.buffer_size(BUFFER_SIZE)
.receiver_max_borrowed_samples(MAX_BORROWED_SAMPLES)
.create_sender()
.unwrap();

let sut_receiver = Sut::Builder::new(&name)
.buffer_size(BUFFER_SIZE)
.receiver_max_borrowed_samples(MAX_BORROWED_SAMPLES)
.create_receiver()
.unwrap();

for i in 0..BUFFER_SIZE {
assert_that!(sut_sender.try_send(PointerOffset::new(i)), is_ok);
}

for _ in 0..MAX_BORROWED_SAMPLES {
let sample = sut_receiver.receive().unwrap().unwrap();
assert_that!(sut_receiver.release(sample), is_ok);
}

assert_that!(sut_sender.try_send(PointerOffset::new(0)), is_ok);

let result = sut_sender.try_send(PointerOffset::new(0));
assert_that!(result, is_err);
assert_that!(
result.err().unwrap(), eq
ZeroCopySendError::ClearRetrieveChannelBeforeSend
);
}

#[test]
fn receiver_cannot_borrow_more_samples_than_set_up<Sut: ZeroCopyConnection>() {
let name = generate_name();
Expand Down
5 changes: 1 addition & 4 deletions iceoryx2/src/port/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,6 @@ impl<'a, Service: service::Service, MessageType: Debug> Publisher<'a, Service, M
* try_send => we tried and expect that the buffer is full
* */
}
Err(ZeroCopySendError::ClearRetrieveChannelBeforeSend) => {
warn!(from self, "Unable to send sample via connection {:?} since the retrieve buffer is full. This can be caused by a corrupted retrieve channel.", connection);
}
Ok(overflow) => {
self.sample_reference_counter[Self::sample_index(address_to_chunk)]
.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -535,11 +532,11 @@ impl<'a, Service: service::Service, MessageType: Debug> PublishMgmt
}

fn send_impl(&self, address_to_chunk: usize) -> Result<usize, ConnectionFailure> {
self.retrieve_returned_samples();
fail!(from self, when self.update_connections(),
"Unable to send sample since the connections could not be updated.");

self.add_to_history(address_to_chunk);
self.retrieve_returned_samples();
Ok(self.deliver_sample(address_to_chunk))
}
}
Expand Down

0 comments on commit a444ddd

Please sign in to comment.