From 7b89058cb45cba68d68147bd316ced4dbf62fe20 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Mon, 19 Feb 2024 10:52:29 +0100 Subject: [PATCH] [#116] Add missing mem sync in shm zero copy connection, remove retrieve buffer check from publisher - the publisher has not enough information to perform this check --- doc/release-notes/iceoryx2-unreleased.md | 8 ++-- iceoryx2-cal/src/zero_copy_connection/mod.rs | 1 - .../posix_shared_memory.rs | 20 +++++----- .../src/zero_copy_connection/process_local.rs | 9 ----- .../tests/zero_copy_connection_trait_tests.rs | 37 ------------------- iceoryx2/src/port/publisher.rs | 5 +-- 6 files changed, 15 insertions(+), 65 deletions(-) diff --git a/doc/release-notes/iceoryx2-unreleased.md b/doc/release-notes/iceoryx2-unreleased.md index 283c2ffa6..0e26996a6 100644 --- a/doc/release-notes/iceoryx2-unreleased.md +++ b/doc/release-notes/iceoryx2-unreleased.md @@ -15,11 +15,13 @@ - * Fixes for [#116](https://github.com/eclipse-iceoryx/iceoryx2/issues/116) - * retrieve channel overflow caused by big publisher loans - * `CreationMode::OpenOrCreate` in `iceoryx2-bb-posix::SharedMemory` * Fix undefined behavior in `spsc::{queue|index_queue}` [#87](https://github.com/eclipse-iceoryx/iceoryx2/issues/87) * Fix `open_or_create` race [#108](https://github.com/eclipse-iceoryx/iceoryx2/issues/108) + * Fixes for [#116](https://github.com/eclipse-iceoryx/iceoryx2/issues/116) + * Fix retrieve channel overflow caused by big publisher loans + * Fix `CreationMode::OpenOrCreate` in `iceoryx2-bb-posix::SharedMemory` + * Add missing memory synchronization to posix shm zero copy connection + * Remove retrieve buffer full check from zero copy connection - sender had insufficient infos available * Fix insufficient memory reordering protection in `spsc::Queue::push` and `spsc::Queue::pop` [#119](https://github.com/eclipse-iceoryx/iceoryx2/issues/119) ### Refactoring diff --git a/iceoryx2-cal/src/zero_copy_connection/mod.rs b/iceoryx2-cal/src/zero_copy_connection/mod.rs index f61296411..12f4db1a2 100644 --- a/iceoryx2-cal/src/zero_copy_connection/mod.rs +++ b/iceoryx2-cal/src/zero_copy_connection/mod.rs @@ -41,7 +41,6 @@ impl std::error::Error for ZeroCopyCreationError {} #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ZeroCopySendError { ReceiveBufferFull, - ClearRetrieveChannelBeforeSend, } impl std::fmt::Display for ZeroCopySendError { diff --git a/iceoryx2-cal/src/zero_copy_connection/posix_shared_memory.rs b/iceoryx2-cal/src/zero_copy_connection/posix_shared_memory.rs index 9fbda6bfb..ed1c2b83c 100644 --- a/iceoryx2-cal/src/zero_copy_connection/posix_shared_memory.rs +++ b/iceoryx2-cal/src/zero_copy_connection/posix_shared_memory.rs @@ -237,10 +237,13 @@ impl Builder { fatal_panic!(from self, when unsafe { (*mgmt_ptr).retrieve_channel.init(&allocator) }, "{} since the retrieve channel allocation failed. - This is an implementation bug!", msg); + ////////////////////////////////////////// + // SYNC POINT: write SharedManagementData + ////////////////////////////////////////// unsafe { (*mgmt_ptr) .init_state - .store(IS_INITIALIZED_STATE_VALUE, Ordering::Relaxed) + .store(IS_INITIALIZED_STATE_VALUE, Ordering::Release) }; shm.release_ownership(); } @@ -251,7 +254,11 @@ impl Builder { with ZeroCopyCreationError::InternalError, "{} since the adaptive wait could not be created.", msg); let mgmt_ref = unsafe { &mut *mgmt_ptr }; - while mgmt_ref.init_state.load(Ordering::Relaxed) != IS_INITIALIZED_STATE_VALUE { + + ////////////////////////////////////////// + // SYNC POINT: read SharedManagementData + ////////////////////////////////////////// + while mgmt_ref.init_state.load(Ordering::Acquire) != IS_INITIALIZED_STATE_VALUE { if fail!(from self, when adaptive_wait.wait(), with ZeroCopyCreationError::InternalError, "{} since a failure while waiting for creation finalization occurred.", msg) < MAX_CREATION_DURATION @@ -426,15 +433,6 @@ impl ZeroCopyPortDetails for Sender { impl ZeroCopySender for Sender { fn try_send(&self, ptr: PointerOffset) -> Result, ZeroCopySendError> { let msg = "Unable to send sample"; - let space_in_retrieve_channel = - self.mgmt().retrieve_channel.capacity() - self.mgmt().retrieve_channel.len(); - - if space_in_retrieve_channel - <= self.mgmt().max_borrowed_samples + self.mgmt().receive_channel.len() - { - fail!(from self, with ZeroCopySendError::ClearRetrieveChannelBeforeSend, - "{} since sufficient space for every sample in the retrieve channel cannot be guaranteed. Samples have to be retrieved before a new sample can be send.", msg); - } if !self.mgmt().enable_safe_overflow && self.mgmt().receive_channel.is_full() { fail!(from self, with ZeroCopySendError::ReceiveBufferFull, diff --git a/iceoryx2-cal/src/zero_copy_connection/process_local.rs b/iceoryx2-cal/src/zero_copy_connection/process_local.rs index b4a96c7f8..ef96ed2ee 100644 --- a/iceoryx2-cal/src/zero_copy_connection/process_local.rs +++ b/iceoryx2-cal/src/zero_copy_connection/process_local.rs @@ -311,15 +311,6 @@ impl ZeroCopyPortDetails for Sender { impl ZeroCopySender for Sender { fn try_send(&self, ptr: PointerOffset) -> Result, ZeroCopySendError> { let msg = "Unable to send sample"; - let space_in_retrieve_channel = - self.mgmt.retrieve_channel.capacity() - self.mgmt.retrieve_channel.len(); - - if space_in_retrieve_channel - <= self.mgmt.max_borrowed_samples + self.mgmt.receive_channel.len() - { - fail!(from self, with ZeroCopySendError::ClearRetrieveChannelBeforeSend, - "{} since sufficient space for every sample in the retrieve channel cannot be guaranteed. Samples have to be retrieved before a new sample can be send.", msg); - } if !self.mgmt.enable_safe_overflow && self.mgmt.receive_channel.is_full() { fail!(from self, with ZeroCopySendError::ReceiveBufferFull, diff --git a/iceoryx2-cal/tests/zero_copy_connection_trait_tests.rs b/iceoryx2-cal/tests/zero_copy_connection_trait_tests.rs index e41dc0099..cfbed9f95 100644 --- a/iceoryx2-cal/tests/zero_copy_connection_trait_tests.rs +++ b/iceoryx2-cal/tests/zero_copy_connection_trait_tests.rs @@ -287,43 +287,6 @@ mod zero_copy_connection { assert_that!(sample, is_none); } - #[test] - fn retrieve_channel_must_always_have_enough_space_left() { - let name = generate_name(); - const BUFFER_SIZE: usize = 34; - const MAX_BORROWED_SAMPLES: usize = 34; - - let sut_sender = Sut::Builder::new(&name) - .buffer_size(BUFFER_SIZE) - .receiver_max_borrowed_samples(MAX_BORROWED_SAMPLES) - .create_sender() - .unwrap(); - - let sut_receiver = Sut::Builder::new(&name) - .buffer_size(BUFFER_SIZE) - .receiver_max_borrowed_samples(MAX_BORROWED_SAMPLES) - .create_receiver() - .unwrap(); - - for i in 0..BUFFER_SIZE { - assert_that!(sut_sender.try_send(PointerOffset::new(i)), is_ok); - } - - for _ in 0..MAX_BORROWED_SAMPLES { - let sample = sut_receiver.receive().unwrap().unwrap(); - assert_that!(sut_receiver.release(sample), is_ok); - } - - assert_that!(sut_sender.try_send(PointerOffset::new(0)), is_ok); - - let result = sut_sender.try_send(PointerOffset::new(0)); - assert_that!(result, is_err); - assert_that!( - result.err().unwrap(), eq - ZeroCopySendError::ClearRetrieveChannelBeforeSend - ); - } - #[test] fn receiver_cannot_borrow_more_samples_than_set_up() { let name = generate_name(); diff --git a/iceoryx2/src/port/publisher.rs b/iceoryx2/src/port/publisher.rs index 7f3d76629..919899fb4 100644 --- a/iceoryx2/src/port/publisher.rs +++ b/iceoryx2/src/port/publisher.rs @@ -339,9 +339,6 @@ impl<'a, Service: service::Service, MessageType: Debug> Publisher<'a, Service, M * try_send => we tried and expect that the buffer is full * */ } - Err(ZeroCopySendError::ClearRetrieveChannelBeforeSend) => { - warn!(from self, "Unable to send sample via connection {:?} since the retrieve buffer is full. This can be caused by a corrupted retrieve channel.", connection); - } Ok(overflow) => { self.sample_reference_counter[Self::sample_index(address_to_chunk)] .fetch_add(1, Ordering::Relaxed); @@ -535,11 +532,11 @@ impl<'a, Service: service::Service, MessageType: Debug> PublishMgmt } fn send_impl(&self, address_to_chunk: usize) -> Result { - self.retrieve_returned_samples(); fail!(from self, when self.update_connections(), "Unable to send sample since the connections could not be updated."); self.add_to_history(address_to_chunk); + self.retrieve_returned_samples(); Ok(self.deliver_sample(address_to_chunk)) } }