Skip to content

Commit

Permalink
[eclipse-iceoryx#571] Fix completion queue capacity exceeded when his…
Browse files Browse the repository at this point in the history
…tory > buffer
  • Loading branch information
elfenpiff committed Dec 30, 2024
1 parent f4931fd commit 9bcd25a
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 3 deletions.
3 changes: 2 additions & 1 deletion doc/release-notes/iceoryx2-unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
conflicts when merging.
-->

* Example text [#1](https://github.com/eclipse-iceoryx/iceoryx2/issues/1)
* Completion queue capacity exceeded when history > buffer
[#571](https://github.com/eclipse-iceoryx/iceoryx2/issues/571)

### Refactoring

Expand Down
8 changes: 6 additions & 2 deletions iceoryx2/src/port/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,6 @@ impl<Service: service::Service> PublisherBackend<Service> {
sample_size: usize,
) -> Result<usize, PublisherSendError> {
self.retrieve_returned_samples();

let deliver_call = match self.config.unable_to_deliver_strategy {
UnableToDeliverStrategy::Block => {
<Service::Connection as ZeroCopyConnection>::Sender::blocking_send
Expand Down Expand Up @@ -555,11 +554,16 @@ impl<Service: service::Service> PublisherBackend<Service> {
let history = unsafe { &mut *history.get() };
for i in 0..history.len() {
let old_sample = unsafe { history.get_unchecked(i) };
self.retrieve_returned_samples();

let offset = PointerOffset::from_value(old_sample.offset);
match connection.sender.try_send(offset, old_sample.size) {
Ok(_) => {
Ok(overflow) => {
self.borrow_sample(offset);

if let Some(old) = overflow {
self.release_sample(old);
}
}
Err(e) => {
warn!(from self, "Failed to deliver history to new subscriber via {:?} due to {:?}", connection, e);
Expand Down
82 changes: 82 additions & 0 deletions iceoryx2/tests/service_publish_subscribe_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3200,6 +3200,88 @@ mod service_publish_subscribe {
deliver_history_with_increasing_samples_works::<Sut>(AllocationStrategy::PowerOfTwo);
}

#[test]
fn does_not_leak_when_subscriber_has_smaller_buffer_size_than_history_size<Sut: Service>() {
let _watchdog = Watchdog::new();
const HISTORY_SIZE: usize = 1000;
const REPETITIONS: usize = 10;
let service_name = generate_name();
let config = generate_isolated_config();
let node = NodeBuilder::new().config(&config).create::<Sut>().unwrap();

let finish_setup = Barrier::new(2);
let start = Barrier::new(2);
let end = Barrier::new(2);

std::thread::scope(|s| {
let update_connection_thread = s.spawn(|| {
let service = node
.service_builder(&service_name)
.publish_subscribe::<usize>()
.max_publishers(1)
.max_subscribers(1)
.subscriber_max_borrowed_samples(1)
.history_size(HISTORY_SIZE)
.subscriber_max_buffer_size(HISTORY_SIZE)
.create()
.unwrap();

let publisher = service
.publisher_builder()
.max_loaned_samples(1)
.create()
.unwrap();

for n in 0..HISTORY_SIZE {
publisher.send_copy(n).unwrap();
}

finish_setup.wait();

for _ in 0..REPETITIONS {
start.wait();

publisher.update_connections().unwrap();

end.wait();
}
});

let new_subscriber_thread = s.spawn(|| {
finish_setup.wait();

let service = node
.service_builder(&service_name)
.publish_subscribe::<usize>()
.open()
.unwrap();

for _ in 0..REPETITIONS {
let subscriber = service
.subscriber_builder()
.buffer_size(1)
.create()
.unwrap();
start.wait();

let mut previous_value = 0;
for _ in 0..HISTORY_SIZE {
let sample = subscriber.receive().unwrap();
if let Some(sample) = sample {
assert_that!(*sample, ge previous_value);
previous_value = *sample;
}
}

end.wait();
}
});

update_connection_thread.join().unwrap();
new_subscriber_thread.join().unwrap();
});
}

#[instantiate_tests(<iceoryx2::service::ipc::Service>)]
mod ipc {}

Expand Down

0 comments on commit 9bcd25a

Please sign in to comment.