From 9bcd25a80e3ccc8a70985f583f03f7f027f095dc Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Mon, 30 Dec 2024 13:47:39 +0100 Subject: [PATCH] [#571] Fix completion queue capacity exceeded when history > buffer --- doc/release-notes/iceoryx2-unreleased.md | 3 +- iceoryx2/src/port/publisher.rs | 8 +- .../tests/service_publish_subscribe_tests.rs | 82 +++++++++++++++++++ 3 files changed, 90 insertions(+), 3 deletions(-) diff --git a/doc/release-notes/iceoryx2-unreleased.md b/doc/release-notes/iceoryx2-unreleased.md index 20261165a..98ddf10d1 100644 --- a/doc/release-notes/iceoryx2-unreleased.md +++ b/doc/release-notes/iceoryx2-unreleased.md @@ -20,7 +20,8 @@ conflicts when merging. --> -* Example text [#1](https://github.com/eclipse-iceoryx/iceoryx2/issues/1) +* Completion queue capacity exceeded when history > buffer + [#571](https://github.com/eclipse-iceoryx/iceoryx2/issues/571) ### Refactoring diff --git a/iceoryx2/src/port/publisher.rs b/iceoryx2/src/port/publisher.rs index b0703da32..23edf4893 100644 --- a/iceoryx2/src/port/publisher.rs +++ b/iceoryx2/src/port/publisher.rs @@ -403,7 +403,6 @@ impl PublisherBackend { sample_size: usize, ) -> Result { self.retrieve_returned_samples(); - let deliver_call = match self.config.unable_to_deliver_strategy { UnableToDeliverStrategy::Block => { ::Sender::blocking_send @@ -555,11 +554,16 @@ impl PublisherBackend { let history = unsafe { &mut *history.get() }; for i in 0..history.len() { let old_sample = unsafe { history.get_unchecked(i) }; + self.retrieve_returned_samples(); let offset = PointerOffset::from_value(old_sample.offset); match connection.sender.try_send(offset, old_sample.size) { - Ok(_) => { + Ok(overflow) => { self.borrow_sample(offset); + + if let Some(old) = overflow { + self.release_sample(old); + } } Err(e) => { warn!(from self, "Failed to deliver history to new subscriber via {:?} due to {:?}", connection, e); diff --git a/iceoryx2/tests/service_publish_subscribe_tests.rs b/iceoryx2/tests/service_publish_subscribe_tests.rs index a48ad311d..305c7a3e2 100644 --- a/iceoryx2/tests/service_publish_subscribe_tests.rs +++ b/iceoryx2/tests/service_publish_subscribe_tests.rs @@ -3200,6 +3200,88 @@ mod service_publish_subscribe { deliver_history_with_increasing_samples_works::(AllocationStrategy::PowerOfTwo); } + #[test] + fn does_not_leak_when_subscriber_has_smaller_buffer_size_than_history_size() { + let _watchdog = Watchdog::new(); + const HISTORY_SIZE: usize = 1000; + const REPETITIONS: usize = 10; + let service_name = generate_name(); + let config = generate_isolated_config(); + let node = NodeBuilder::new().config(&config).create::().unwrap(); + + let finish_setup = Barrier::new(2); + let start = Barrier::new(2); + let end = Barrier::new(2); + + std::thread::scope(|s| { + let update_connection_thread = s.spawn(|| { + let service = node + .service_builder(&service_name) + .publish_subscribe::() + .max_publishers(1) + .max_subscribers(1) + .subscriber_max_borrowed_samples(1) + .history_size(HISTORY_SIZE) + .subscriber_max_buffer_size(HISTORY_SIZE) + .create() + .unwrap(); + + let publisher = service + .publisher_builder() + .max_loaned_samples(1) + .create() + .unwrap(); + + for n in 0..HISTORY_SIZE { + publisher.send_copy(n).unwrap(); + } + + finish_setup.wait(); + + for _ in 0..REPETITIONS { + start.wait(); + + publisher.update_connections().unwrap(); + + end.wait(); + } + }); + + let new_subscriber_thread = s.spawn(|| { + finish_setup.wait(); + + let service = node + .service_builder(&service_name) + .publish_subscribe::() + .open() + .unwrap(); + + for _ in 0..REPETITIONS { + let subscriber = service + .subscriber_builder() + .buffer_size(1) + .create() + .unwrap(); + start.wait(); + + let mut previous_value = 0; + for _ in 0..HISTORY_SIZE { + let sample = subscriber.receive().unwrap(); + if let Some(sample) = sample { + assert_that!(*sample, ge previous_value); + previous_value = *sample; + } + } + + end.wait(); + } + }); + + update_connection_thread.join().unwrap(); + new_subscriber_thread.join().unwrap(); + }); + } + #[instantiate_tests()] mod ipc {}