Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] std::system_error thrown during/after client->subscribe(...) when using configuration setUnAckedMessagesTimeoutMs. #358

Open
1 of 2 tasks
jato-c8y opened this issue Nov 30, 2023 · 10 comments · Fixed by #362

Comments

@jato-c8y
Copy link

jato-c8y commented Nov 30, 2023

Search before asking

  • I searched in the issues and found nothing similar.

Version

Pulsar version 3.3.
OS - Red Hat Enterprise Linux 8.9 and RHEL 9.3 and Linux 5.10.0-26-amd64

Minimal reproduce step

Our test that is observing the issue is when we are unable to connect to the server immediately on startup, but the server is reachable after a while. We attempt to subscribe, if the subscribe fails we try again.
Sometimes when the subscribe eventually succeeds all is well.
Sometimes when the subscribe is successful it still throws the exception.
Sometimes even before the subscribe returns there is the exception.

//Rough setup.

clientConfig.setReceiverQueueSize(1000);
clientConfig.setUnAckedMessagesTimeoutMs(10000);
clientConfig.setConsumerType(pulsar::ConsumerType::ConsumerExclusive);
client = std::shared_ptr<pulsar::Client> (new pulsar::Client(serviceURL, clientConfig));
pulsar::Result result = pulsar::ResultRetryable;
pulsar::Consumer consumer;
while (true && result != pulsar::ResultOk) {
   result = client->subscribe(["A topic", "Another topic"], subscriberName, clientConfig, consumer);
   if (result != pulsar::ResultOk) {
      //sleep 5s;
   }
}

What did you expect to see?

No exception.

What did you see instead?

std::system_error thrown and not handled.

Anything else?

std::system_error thrown during/after client->subscribe(...) when using configuration setUnAckedMessagesTimeoutMs.

On attempt to subscribe and we have already set a value in the Client Configuration for setUnAckedMessagesTimeoutMs we are observing std::system_error being thrown, not handled, and not caught in ExecutorService and leads to terminating the application. setUnAckedMessagesTimeoutMs is set to 10000, as this is the minimum we have not experimented with other values.
Without setUnAckedMessagesTimeoutMs set in the configuration no exception is seen on or after subscribe(..).

We suspect the exception is being thrown by std::recursive_mutex when trying to aquire the lock in: UnAckedMessageTrackerEnabled::timeoutHandlerHelper() of pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc

  1. Please can this be investigated for the cause of the exception.
  2. Please can exceptions thrown in your library background threads be caught and not terminate the application.

stderr:
terminate called after throwing an instance of 'std::system_error'
what(): Invalid argument

backtrace:
Program terminated with signal SIGABRT, Aborted.
0 0x00007fdef2329acf in raise () from /lib64/libc.so.6
[Current thread is 1 (Thread 0x7fddd1253700 (LWP 1875868))]
0 0x00007fdef2329acf in raise () from /lib64/libc.so.6
1 0x00007fdef22fcea5 in abort () from /lib64/libc.so.6
2 0x00007fdef68e69e3 in ::coreHandler(int, siginfo_t*, void*) () from /libapclient.so.10.15
3
4 0x00007fdef2329acf in raise () from /lib64/libc.so.6
5 0x00007fdef22fcea5 in abort () from /lib64/libc.so.6
6 0x00007fdef2eea09b in __gnu_cxx::__verbose_terminate_handler() [clone .cold.1] () from /lib64/libstdc++.so.6
7 0x00007fdef2ef054c in __cxxabiv1::__terminate(void (
)()) () from /lib64/libstdc++.so.6
8 0x00007fdef2ef05a7 in std::terminate() () from /lib64/libstdc++.so.6
9 0x00007fdef2ef0808 in __cxa_throw () from /lib64/libstdc++.so.6
10 0x00007fdef2eec235 in std::__throw_system_error(int) [clone .cold.28] () from /lib64/libstdc++.so.6
11 0x00007fddd380f508 in pulsar::UnAckedMessageTrackerEnabled::timeoutHandlerHelper() () from /libconnectivity-pulsar-client.so
12 0x00007fddd380f5a9 in pulsar::UnAckedMessageTrackerEnabled::timeoutHandler() () from /libconnectivity-pulsar-client.so
13 0x00007fddd38111e2 in boost::asio::detail::wait_handler<pulsar::UnAckedMessageTrackerEnabled::timeoutHandler()::{lambda(boost::system::error_code const&) # 1}, boost::asio::any_io_executor>::do_complete(void
, boost::asio::detail::scheduler_operation
, boost::system::error_code const&, unsigned long) () from */libconnectivity-pulsar-client.so
14 0x00007fddd374ac38 in boost::asio::detail::scheduler::run(boost::system::error_code&) () from */libconnectivity-pulsar-client.so
15 0x00007fddd3743f92 in pulsar::ExecutorService::start()::{lambda() # 1}::operator()() const [clone .isra.334] () from */libconnectivity-pulsar-client.so
16 0x00007fdef2f1cb23 in execute_native_thread_routine () from /lib64/libstdc++.so.6
17 0x00007fdef26a81ca in start_thread () from /lib64/libpthread.so.0
18 0x00007fdef2314e73 in clone () from /lib64/libc.so.6

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@BewareMyPower
Copy link
Contributor

We suspect the exception is being thrown by std::recursive_mutex when trying to aquire the lock in:

You can enable the debug level logs to verify your guess. See

LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ "

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Dec 1, 2023

I think it should be caused by

timer_->async_wait([&](const boost::system::error_code& ec) {

We should catch shared_from_this() rather than & to ensure the this pointer is valid when the callback is called.

when we are unable to connect to the server immediately on startup

Is there an easy way to simulate the case? I tried starting a consumer and then starting the Pulsar standalone locally but I cannot reproduce it.

@jato-c8y
Copy link
Author

jato-c8y commented Dec 1, 2023

We suspect the exception is being thrown by std::recursive_mutex when trying to aquire the lock in:

You can enable the debug level logs to verify your guess. See

LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ "

pulsarLogs.txt

Please find example logs attached.
We can see that the line 'LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ " ' is not logged right before the crash, but the stacktrace indicates it is thrown from within timeoutHandlerHelper pointing to the only line above this log which is
https://github.com/apache/pulsar-client-cpp/blob/d2094828f9fe756b315ba194e1f8a69ca24ac6b4/lib/UnAckedMessageTrackerEnabled.cc#L48C1-L48C1

@jato-c8y
Copy link
Author

jato-c8y commented Dec 1, 2023

I think it should be caused by

timer_->async_wait([&](const boost::system::error_code& ec) {

We should catch shared_from_this() rather than & to ensure the this pointer is valid when the callback is called.

when we are unable to connect to the server immediately on startup

Is there an easy way to simulate the case? I tried starting a consumer and then starting the Pulsar standalone locally but I cannot reproduce it.

I am not sure I see the link between the stacktrace and the line you think it could be caused by.
Is it not being thrown from within call to line 34?:

Unfortunately currently our code is within our application and I do not have a snippet or a standalone reproducible script to show you.
I should have mentioned that the exception is not 100% reproducible.
Our test is as follows.
We have a Pulsar server running, we have a proxy server that we use to connect to the Pulsar server, we suspend the proxy server to simulate an inability to connect to Pulsar, we start and attempt to connect our pulsar-client-cpp to the proxy server, the client.subscribe call fails, we resume the proxy server such that the pulsar-client-cpp can now communicate with the Pulsar server.

The exception is seen around 20% of the time when I cycle the test.

@BewareMyPower
Copy link
Contributor

but the stacktrace indicates it is thrown from within timeoutHandlerHelper pointing to the only line above this log which is

If the UnAckedMessageTrackerEnabled object has been destroyed, locking the lock_ field that belongs to a destroyed object could be a undefined behavior.

BewareMyPower added a commit to BewareMyPower/pulsar-client-cpp that referenced this issue Dec 4, 2023
Fixes apache#358
Fixes apache#359

### Motivation

`async_wait` is not used correctly in some places. A callback that
captures the `this` pointer or reference to `this` is passed to
`async_wait`, if this object is destroyed when the callback is called,
an invalid memory access will happen.

### Modifications

Use the following pattern in all `async_wait` calls.

```c++
std::weak_ptr<T> weakSelf{shared_from_this()};
timer_->async_wait([weakSelf](/* ... */) {
    if (auto self = weakSelf.lock()) {
        self->foo();
    }
});
```
@BewareMyPower
Copy link
Contributor

I opened a PR that will close this issue. When you have time, you can test if that patch works.

BewareMyPower added a commit to BewareMyPower/pulsar-client-cpp that referenced this issue Dec 4, 2023
Fixes apache#358
Fixes apache#359

### Motivation

`async_wait` is not used correctly in some places. A callback that
captures the `this` pointer or reference to `this` is passed to
`async_wait`, if this object is destroyed when the callback is called,
an invalid memory access will happen.

### Modifications

Use the following pattern in all `async_wait` calls.

```c++
std::weak_ptr<T> weakSelf{shared_from_this()};
timer_->async_wait([weakSelf](/* ... */) {
    if (auto self = weakSelf.lock()) {
        self->foo();
    }
});
```
@jato-c8y
Copy link
Author

jato-c8y commented Dec 5, 2023

Thank you for the prompt investigation and PR.
I have run against this patch and I do not see a crash.
There is however a slight change in behaviour.
If the client.subscribe did not succeed, after a while (I think 10s) it would return with Result != ResultOk.
Now the client.subscribe seemingly does not return until it succeeds, I have observed over 60s, the reconnects are still attempting, but no return from the subscribe call.
I can work with this, but is the change intentional or was it always intended to behave this way?

@BewareMyPower
Copy link
Contributor

Now the client.subscribe seemingly does not return until it succeeds, I have observed over 60s, the reconnects are still attempting, but no return from the subscribe call.

The current behavior should be wrong. I will take a look soon.

@BewareMyPower
Copy link
Contributor

In my local env, it failed after 30s when I ran SampleConsumer without any Pulsar server running.

2023-12-05 21:02:37.643 INFO  [0x202635e00] Client:86 | Subscribing on Topic :persistent://public/default/my-topic
2023-12-05 21:02:37.643 INFO  [0x202635e00] ClientConnection:189 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2023-12-05 21:02:37.643 INFO  [0x202635e00] ConnectionPool:114 | Created connection for pulsar://localhost:6650-0
2023-12-05 21:02:37.644 WARN  [0x16b78b000] ClientConnection:468 | [<none> -> pulsar://localhost:6650] Failed to establish connection: Connection refused
2023-12-05 21:02:37.644 WARN  [0x16b78b000] ClientConnection:468 | [<none> -> pulsar://localhost:6650] Failed to establish connection: Connection refused
2023-12-05 21:02:37.644 INFO  [0x16b78b000] ClientConnection:1317 | [<none> -> pulsar://localhost:6650] Connection disconnected (refCnt: 2)
2023-12-05 21:02:37.644 INFO  [0x16b78b000] ConnectionPool:129 | Remove connection for pulsar://localhost:6650-0
...
2023-12-05 21:03:07.684 ERROR [0x16b78b000] ClientImpl:489 | Error Checking/Getting Partition Metadata while Subscribing on persistent://public/default/my-topic -- TimeOut
2023-12-05 21:03:07.684 INFO  [0x16b78b000] ClientConnection:266 | [<none> -> pulsar://localhost:6650] Destroyed connection to pulsar://localhost:6650-0
2023-12-05 21:03:07.684 ERROR [0x202635e00] SampleConsumer:35 | Failed to subscribe: TimeOut

The timeout is the operationTimeout (30s by default) because connection timeout means the TCP connection timeout that does not include the connection refused error (ECONNREFUSED in POSIX).

@jato-sag Could you upload your client logs?

@jato-c8y
Copy link
Author

jato-c8y commented Dec 5, 2023

I tried changing/reducing the operationTimeout, but I don't see any difference, subscribe does not return.
Please see logs attached.

Also for clarity, this is the rough code snippet.
void subscribeClient() { pulsar::Result result = pulsar::ResultRetryable; while (m_running && result != pulsar::ResultOk) { result = client->subscribe(...); if (result != pulsar::ResultOk) { //LOG ERROR } else { //LOG SUCCESS } } }

pulsarLogsPostPatch.txt

BewareMyPower added a commit that referenced this issue Dec 6, 2023
Fixes #358
Fixes #359

### Motivation

`async_wait` is not used correctly in some places. A callback that
captures the `this` pointer or reference to `this` is passed to
`async_wait`, if this object is destroyed when the callback is called,
an invalid memory access will happen.

### Modifications

Use the following pattern in all `async_wait` calls.

```c++
std::weak_ptr<T> weakSelf{shared_from_this()};
timer_->async_wait([weakSelf](/* ... */) {
    if (auto self = weakSelf.lock()) {
        self->foo();
    }
});
```
BewareMyPower added a commit that referenced this issue Dec 6, 2023
Fixes #358
Fixes #359

### Motivation

`async_wait` is not used correctly in some places. A callback that
captures the `this` pointer or reference to `this` is passed to
`async_wait`, if this object is destroyed when the callback is called,
an invalid memory access will happen.

### Modifications

Use the following pattern in all `async_wait` calls.

```c++
std::weak_ptr<T> weakSelf{shared_from_this()};
timer_->async_wait([weakSelf](/* ... */) {
    if (auto self = weakSelf.lock()) {
        self->foo();
    }
});
```

(cherry picked from commit 24ab12c)
@BewareMyPower BewareMyPower reopened this Dec 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants