Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] Support executing callbacks concurrently #368

Open
BewareMyPower opened this issue Dec 6, 2023 · 0 comments
Open

[Feature Request] Support executing callbacks concurrently #368

BewareMyPower opened this issue Dec 6, 2023 · 0 comments

Comments

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Dec 6, 2023

#include <pulsar/Client.h>

#include <chrono>
#include <thread>

#include "lib/LogUtils.h"

DECLARE_LOG_OBJECT()
using namespace pulsar;

int main(int argc, char *argv[]) {
    ClientConfiguration conf;
    conf.setIOThreads(8);
    Client client{"pulsar://localhost:6650", conf};
    for (int i = 0; i < 5; i++) {
        auto topic = "my-topic-" + std::to_string(i);
        LOG_INFO(" XYZ Before create producer for " << topic);
        client.createProducerAsync(topic, [topic](Result result, Producer producer) {
            LOG_INFO(" XYZ After create producer for " << topic << ": " << result);
            producer.sendAsync(MessageBuilder().setContent("msg").build(),
                               [](Result result, const MessageId &msgId) {
                                   LOG_INFO("XYZ send: " << result << ", " << msgId);
                               });
            std::this_thread::sleep_for(std::chrono::hours(1));
        });
    }
    std::this_thread::sleep_for(std::chrono::hours(1));
}

Output:

2023-12-06 20:01:08.044 INFO  [0x1fb6f1e00] SampleProducer:18 |  XYZ Before create producer for my-topic-0
2023-12-06 20:01:08.130 INFO  [0x1fb6f1e00] ClientConnection:190 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2023-12-06 20:01:08.130 INFO  [0x1fb6f1e00] ConnectionPool:114 | Created connection for pulsar://localhost:6650-0
2023-12-06 20:01:08.143 INFO  [0x1fb6f1e00] SampleProducer:18 |  XYZ Before create producer for my-topic-1
2023-12-06 20:01:08.143 INFO  [0x1fb6f1e00] SampleProducer:18 |  XYZ Before create producer for my-topic-2
2023-12-06 20:01:08.143 INFO  [0x1fb6f1e00] SampleProducer:18 |  XYZ Before create producer for my-topic-3
2023-12-06 20:01:08.144 INFO  [0x1fb6f1e00] SampleProducer:18 |  XYZ Before create producer for my-topic-4
2023-12-06 20:01:08.163 INFO  [0x16ef87000] ClientConnection:404 | [127.0.0.1:54469 -> 127.0.0.1:6650] Connected to broker
2023-12-06 20:01:08.344 INFO  [0x16ef87000] HandlerBase:83 | [persistent://public/default/my-topic-0, ] Getting connection from pool
2023-12-06 20:01:08.347 INFO  [0x16ef87000] HandlerBase:83 | [persistent://public/default/my-topic-2, ] Getting connection from pool
2023-12-06 20:01:08.347 INFO  [0x16ef87000] HandlerBase:83 | [persistent://public/default/my-topic-1, ] Getting connection from pool
2023-12-06 20:01:08.347 INFO  [0x16ef87000] HandlerBase:83 | [persistent://public/default/my-topic-4, ] Getting connection from pool
2023-12-06 20:01:08.347 INFO  [0x16ef87000] HandlerBase:83 | [persistent://public/default/my-topic-3, ] Getting connection from pool
2023-12-06 20:01:08.355 INFO  [0x16ef87000] BinaryProtoLookupService:87 | Lookup response for persistent://public/default/my-topic-0, lookup-broker-url pulsar://127.0.0.1:6650, from [127.0.0.1:54469 -> 127.0.0.1:6650] 
2023-12-06 20:01:08.355 INFO  [0x16ef87000] ClientConnection:190 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2023-12-06 20:01:08.355 INFO  [0x16ef87000] ConnectionPool:114 | Created connection for pulsar://127.0.0.1:6650-0
2023-12-06 20:01:08.357 INFO  [0x16ef87000] BinaryProtoLookupService:87 | Lookup response for persistent://public/default/my-topic-2, lookup-broker-url pulsar://127.0.0.1:6650, from [127.0.0.1:54469 -> 127.0.0.1:6650] 
2023-12-06 20:01:08.357 INFO  [0x16ef87000] BinaryProtoLookupService:87 | Lookup response for persistent://public/default/my-topic-1, lookup-broker-url pulsar://127.0.0.1:6650, from [127.0.0.1:54469 -> 127.0.0.1:6650] 
2023-12-06 20:01:08.357 INFO  [0x16ef87000] BinaryProtoLookupService:87 | Lookup response for persistent://public/default/my-topic-4, lookup-broker-url pulsar://127.0.0.1:6650, from [127.0.0.1:54469 -> 127.0.0.1:6650] 
2023-12-06 20:01:08.357 INFO  [0x16ef87000] BinaryProtoLookupService:87 | Lookup response for persistent://public/default/my-topic-3, lookup-broker-url pulsar://127.0.0.1:6650, from [127.0.0.1:54469 -> 127.0.0.1:6650] 
2023-12-06 20:01:08.363 INFO  [0x16ef87000] ClientConnection:406 | [127.0.0.1:54470 -> 127.0.0.1:6650] Connected to broker through proxy. Logical broker: pulsar://127.0.0.1:6650
2023-12-06 20:01:08.427 INFO  [0x16ef87000] ProducerImpl:212 | [persistent://public/default/my-topic-2, ] Created producer on broker [127.0.0.1:54470 -> 127.0.0.1:6650] 
2023-12-06 20:01:08.428 INFO  [0x16ef87000] SampleProducer:20 |  XYZ After create producer for my-topic-2: Ok

Only one producer was created successfully. Other producers were blocked because all callbacks of createProducerAsync were executed in the same I/O thread, even though the number of I/O threads is 8.

That's because each event loop of a connection to broker uses the same thread to execute callbacks. So if a callback is blocked, other asynchronous calls will be blocked. Increasing connectionsPerBroker can make more I/O threads be used, but users cannot control which thread the callback is executed.

In the Java client, the asynchronous API returns a CompletableFuture whose thenXxxAsync method can execute the callback in a user provided executor.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant