Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The publishers retrieve channel is full and the sample cannot be returned #571

Open
smileghp opened this issue Dec 29, 2024 · 15 comments
Open
Labels
bug Something isn't working

Comments

@smileghp
Copy link

smileghp commented Dec 29, 2024

Required information

Operating system:
Linux

  • OS name, version
  • Additionally, on Linux, Mac Os, Unix, output of: uname -a
  • Additionally, on Windows, output of: ver
    Linux for arm64

Rust version:
Output of: rustc --version

Cargo version:
Output of: cargo --version

iceoryx2 version:
0.4.1

Detailed log output:

[F] Sample<[u8], iceoryx2::service::builder::publish_subscribe::CustomHeaderMarker, iceoryx2::service::ipc::Service> { details: SampleDetails { publisher_connection: Connection { receiver: R
eceiver { storage: Storage { shm: SharedMemory { name: FileName { value: FixedSizeByteString<255> { len: 120, data: "iox2_b9fc73e5c1f646968758453273c6c65cb372831b_11190463427929560482297 388 [F]
58164885_1348123209380142857096792137656.connection" } }, size: 358, base_address: 0xffffa179e000, has_ownership: false, file_descriptor: FileDescriptor { value: 93, is_owned: true }, me
mory_lock: None }, name: FileName { value: FixedSizeByteString<255> { len: 63, data: "1119046342792956048229758164885_1348123209380142857096792137656" } }, _phantom_data: PhantomData }, borrow_counter: UnsafeCell { .. }, name: FileName { value: FixedSizeByteString<255> { len: 63, data: "111904634
2792956048229758164885_1348123209380142857096792137656" } } }, data_segment: Memory { storage: Storage { shm: SharedMemory { name: FileName { value: FixedSizeByteString<255> { len: 98, d
ata: "iox2_0354a209029e7d094a819e2d4030ea331e6caaf0_24469_1119046342792956048229758164885.publisher_data" } }, size: 14750638, base_address: 0xfffef21ec000, has_ownership: false, file_de
scriptor: FileDescriptor { value: 94, is_owned: true }, memory_lock: None }, name: FileName { value: FixedSizeByteString<255> { len: 37, data: "24469_1119046342792956048229758164885" } }
, _phantom_data: PhantomData<iceoryx2_cal::shared_memory::common::details::AllocatorDetails<iceoryx2_cal::shm_allocator::pool_allocator::PoolAllocator>> }, name: FileName { value: FixedS
Sample<[u8], iceoryx2::service::builder::publish_subscribe::CustomHeaderMarker, iceoryx2::service::ipc::Service> { details: SampleDetails { publisher_connection: Connection { receiver: R izeByteString<255> { len: 37, data: "24469_1119046342792956048229758164885" } }, payload_start_address: 281470448877696, _phantom: PhantomData<iceoryx2_cal::shm_allocator::pool_allocator
::PoolAllocator> }, publisher_id: UniquePublisherId(UniqueSystemId { value: 1119046342792956048229758164885, pid: 24469, creation_time: Time { clock_type: Realtime, seconds: 1735479263,

                        nanoseconds: 534081654 } }) }, offset: PointerOffset(204864), origin: UniquePublisherId(UniqueSystemId { value: 1119046342792956048229758164885, pid: 24469, creation_time: Time { clock_teceiver { storage: Storage { shm: SharedMemory { name: FileName { value: FixedSizeByteString<255> { len: 120, data: "iox2_b9fc73e5c1f646968758453273c6c65cb372831b_11190463427929560482297
          ype: Realtime, seconds: 1735479263, nanoseconds: 534081654 } }) } }

               | This should never happen! The publishers retrieve channel is full and the sample cannot be returned.       

I am using a C++ API, and when receiving a certain topic, this phenomenon inevitably occurs after running for a period of time. I would like to know the cause of this issue and if you have any suggestions or ideas. Thank you!

@smileghp smileghp added the bug Something isn't working label Dec 29, 2024
@elfenpiff
Copy link
Contributor

@smileghp As the error message says, this should never happen and is a bug on our side. This occurs when the publisher sends multiple samples and then stops, then the subscriber receives all samples and lets them go out of scope, which then leads to a full queue.

I think I am able to reproduce this cleanly on our side when you provide me with some more details:

  1. How many publishers are there in the setup?
  2. Do you use the default configuration or do you specify certain QoS like: publisher history size, subscriber max buffer size, subscriber max borrowed samples, publisher max loaned samples, safe overflow. If you set one of the parameters, please provide me the values.
  3. How many subscribers are there in the setup?

Can you provide more details about the circumstances that led to the issue?

@elfenpiff
Copy link
Contributor

elfenpiff commented Dec 30, 2024

@smileghp Oh and two more question.

  1. In your setup, when the bug occurred, was the publisher or the subscriber created first?
  2. Do you use multi threading? Are samples accessed, released in a different thread then the subscriber is handled?

@smileghp
Copy link
Author

The configuration I used is as follows. I can’t clearly specify the method for reproduction at this moment, but I will try to create a scenario that can be reliably reproduced.

[global]
root-path-unix = '/tmp/iceoryx2/'
root-path-windows = 'c:\Temp\iceoryx2'
prefix = 'iox2_'

[global.node]
directory = 'nodes'
monitor-suffix = '.node_monitor'
static-config-suffix = '.details'
service-tag-suffix = '.service_tag'
cleanup-dead-nodes-on-creation = true
cleanup-dead-nodes-on-destruction = true

[global.service]
directory = 'services'
publisher-data-segment-suffix = '.publisher_data'
static-config-storage-suffix = '.service'
dynamic-config-storage-suffix = '.dynamic'
event-connection-suffix = '.event'
connection-suffix = '.connection'
creation-timeout.secs = 0
creation-timeout.nanos = 500000000

[defaults.publish-subscribe]
max-subscribers = 8
max-publishers = 4
max-nodes = 20
publisher-history-size = 1
subscriber-max-buffer-size = 10
subscriber-max-borrowed-samples = 10
publisher-max-loaned-samples = 10
enable-safe-overflow = true
unable-to-deliver-strategy = 'Block' # or 'DiscardSample'
subscriber-expired-connection-buffer = 128

[defaults.event]
max-listeners = 4
max-notifiers = 16
max-nodes = 36
event-id-max-value = 32

@smileghp
Copy link
Author

smileghp commented Dec 30, 2024

  1. In your setup, when the bug occurred, was the publisher or the subscriber created first?

subscriber created first.

  1. Do you use multi threading? Are samples accessed, released in a different thread then the subscriber is handled?

yes,it's a mutli-thread process, but samples accessed, released in a same thread when subscriber handled.

@elfenpiff
Copy link
Contributor

@smileghp Is the publisher accessed from multiple threads?

@elfenpiff
Copy link
Contributor

elfenpiff commented Dec 30, 2024

@smileghp The underlying mechanism is a construct with a submission and a completion queue. Your problem is, that the completion queue capacity is exceeded. The capacity is calculated as the sum of the subscriber buffer size N + subscriber max borrowed samples B + 1 (required when the processes interrupt each other).

Whenever the publisher delivers a sample, it calls Publisher::retrieve_returned_samples first to acquire all returned samples from the completion queue. The thing is, I cannot think of a scenario where the completion queue needs a capacity greater than N + B + 1.

Here are some abstract scenarios I have drafted, maybe you have an idea how the capacity of the completion queue can be exceeded - if you cannot create a reproducible minimal example.

  • subscriber buffer size = N, subscriber max borrowed samples = B
  • subscriber current borrowed samples = S
  • submission queue size = Q (capacity = N)
  • completion queue size = C (capacity = N + B + 1)

Case 1:

  • Start state: Q = N, C = 0, S = B
  1. Publisher sends sample
    • Publisher::retrieve_returned_samples()
    • Publisher interrupted
  2. Subscriber returns B samples
    • C = B
    • S = 0
    • Q = N
  3. Subscriber consumes N and returns all N
    • C = B + N
    • S = 0
    • Q = 0
  4. Publisher continues and delivers sample
    • C = B + N
    • S = 0
    • Q = 1
  5. Subscriber consumer sample and returns it
    • C = B + N + 1
    • S = 0
    • Q = 0

Case 2:

  • Start state: Q = N, C = 0, S = B
  1. Subscriber returns sample
    • Interrupted before ZeroCopyReceiver::release() call
  2. Publisher sends sample
    • Publisher::retrieve_returned_samples()
    • Publisher interrupted
  3. Subscriber continues and releases sample
    • C = 1
    • S = B - 1
    • Q = N
  4. Subscriber consumes another sample
    • C = 1
    • S = B
    • Q = N - 1
  5. Publisher continues and delivers sample
    • C = 1
    • S = B
    • Q = N
  6. Subscriber consumes and releases all samples
    • C = B + N + 1
    • S = 0
    • Q = 0

@smileghp
Copy link
Author

@smileghp Is the publisher accessed from multiple threads?

yes

@smileghp
Copy link
Author

Based on your description, I initially think that this error should occur in the publisher’s process, but I observed that the phenomenon seems to happen in the subscriber’s thread.

@smileghp
Copy link
Author

smileghp commented Dec 30, 2024

And sometimes the subscriber encounters the error “ExceedsMaxBorrowedSamples”

In C++, does it return the sample immediately, or could you explain the timing of when the sample is returned? Is there an interface to actively return the sample?

the subscriber handle is as follows

if (!subscriber_->has_samples().expect("has samples")) {
return false;
}
auto expect = subscriber_->receive();
if (expect.has_error()) {
std::cout << "iox2 read error: " << channel_name() << " err: " << ErrToString(expect.error())
<< std::endl;
}
auto sample = std::move(expect.value());
if (!sample.has_value()) {
return false;
}
while (sample.has_value()) {
if (pub_unique_id_ == nullptr || !(sample->origin() == * pub_unique_id_)) {
const DataType* data_raw = &sample->payload();
if(data_raw == nullptr) {
return false;
}
callback(data_raw->data, data_raw->length); //do something
} else {
const auto data_raw = sample->payload();
}
auto expect = subscriber_->receive();
if (expect.has_error()) {
std::cout << "iox2 read error: " << channel_name() << " err: " << ErrToString(expect.error())
<< std::endl;
}
sample = std::move(expect.value());
}

@elfenpiff
Copy link
Contributor

elfenpiff commented Dec 30, 2024

@smileghp

Based on your description, I initially think that this error should occur in the publisher’s process, but I observed that the phenomenon seems to happen in the subscriber’s thread.

It is a bug on the publisher side that will affect the subscriber side.

Is the publisher accessed from multiple threads?

yes

This can also be the root cause since no construct of iceoryx2 is thread-safe. If you want to access the publisher from multiple threads you can create a separate publisher for each thread.

I created a pull request #572 that addresses one issue that can lead to the error you encountered. But it happens when the subscriber has a smaller buffer size than the history size, and the history is consumed on the subscriber side while the publisher is delivering it.
Could you please try out this version? Maybe it has already fixed your issue.

@smileghp
Copy link
Author

smileghp commented Dec 30, 2024

Okay, I will pick your changes and test them in my environment.

although multiple threads can access the publisher, there will be no competition because I used a lock to ensure it.

In C++, does it return the sample immediately, or could you explain the timing of when the sample is returned? Is there an interface to actively return the sample?

Can you explain this?

@elfenpiff
Copy link
Contributor

Okay, I will pick your changes and test them in my environment.
although multiple threads can access the publisher, there will be no > competition because I used a lock to ensure it.

Actually, this is not enough since the SampleMut is coupled with the Publisher, and as soon as you call send, you have a concurrency issue again which could lead to the exact scenario you are observing. This can only be handled with a low level mutex deep inside the publisher on the Rust side. We will tackle this problem soon and provide on the Rust and the C++ side publisher/subscriber and sample versions that are out-of-the-box threadsafe.

@elfenpiff
Copy link
Contributor

In C++, does it return the sample immediately, or could you explain the timing of when the sample is returned? Is there an interface to actively return the sample?

As soon as the Sample goes out of scope on the Subscriber side it will be returned to the Publisher.

@smileghp
Copy link
Author

smileghp commented Dec 30, 2024

In C++, does it return the sample immediately, or could you explain the timing of when the sample is returned? Is there an interface to actively return the sample?

As soon as the Sample goes out of scope on the Subscriber side it will be returned to the Publisher.

@elfenpiff okay but I still don’t understand why the “ExceedsMaxBorrowedSamples” error occurs in 'Subscriber' side. My implementation is the code mentioned above.

And sometimes the subscriber encounters the error “ExceedsMaxBorrowedSamples”

In C++, does it return the sample immediately, or could you explain the timing of when the sample is returned? Is there an interface to actively return the sample?

the subscriber handle is as follows

{
if (!subscriber_->has_samples().expect("has samples")) {
return false;
}
auto expect = subscriber_->receive();
if (expect.has_error()) {
std::cout << "iox2 read error: " << channel_name() << " err: " << ErrToString(expect.error())
<< std::endl;
}
auto sample = std::move(expect.value());
if (!sample.has_value()) {
return false;
}
while (sample.has_value()) {
if (pub_unique_id_ == nullptr || !(sample->origin() == * pub_unique_id_)) {
const DataType* data_raw = &sample->payload();
if(data_raw == nullptr) {
return false;
}
callback(data_raw->data, data_raw->length); //do something
} else {
const auto data_raw = sample->payload();
}
auto expect = subscriber_->receive();
if (expect.has_error()) {
std::cout << "iox2 read error: " << channel_name() << " err: " << ErrToString(expect.error())
<< std::endl;
}
sample = std::move(expect.value());
}

@elfenpiff
Copy link
Contributor

@smileghp Since the completion queue is filled by the subscriber whenever a sample is released and the publisher has the responsibility to recycle the samples from the completion queue.

It follows a specific contract, and when the publisher violates that contract, it causes a failure on the subscriber side since the subscriber just adds samples to the completion queue.

It would make sense to handle this differently on the subscriber side so that it is not harshly affected - a better way would be just to cut the connection to the publisher since it is obviously malfunctioning.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants