Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support seek operation on a multi-topics consumer #426

Merged
merged 6 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 47 additions & 63 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,41 +338,23 @@ void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback)
}
state_ = Closing;

std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0);
auto self = get_shared_this_ptr();
int numConsumers = 0;
consumers_.forEachValue(
[&numConsumers, &consumerUnsubed, &self, callback](const ConsumerImplPtr& consumer) {
numConsumers++;
consumer->unsubscribeAsync([self, consumerUnsubed, callback](Result result) {
self->handleUnsubscribedAsync(result, consumerUnsubed, callback);
[this, self, callback](const ConsumerImplPtr& consumer, SharedFuture future) {
consumer->unsubscribeAsync([this, self, callback, future](Result result) {
if (result != ResultOk) {
state_ = Failed;
LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
<< result << " subscription - " << subscriptionName_);
}
if (future.tryComplete()) {
LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer. - "
<< consumerStr_);
callback((state_ != Failed) ? ResultOk : ResultUnknownError);
}
});
});
if (numConsumers == 0) {
// No need to unsubscribe, since the list matching the regex was empty
callback(ResultOk);
}
}

void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
std::shared_ptr<std::atomic<int>> consumerUnsubed,
ResultCallback callback) {
(*consumerUnsubed)++;

if (result != ResultOk) {
state_ = Failed;
LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
<< result << " subscription - " << subscriptionName_);
}

if (consumerUnsubed->load() == numberTopicPartitions_->load()) {
LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer. - " << consumerStr_);
Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError;
// The `callback` is a wrapper of user provided callback, it's not null and will call `shutdown()` if
// unsubscribe succeeds.
callback(result1);
return;
}
},
[callback] { callback(ResultOk); });
}

void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, ResultCallback callback) {
Expand Down Expand Up @@ -899,50 +881,52 @@ std::shared_ptr<TopicName> MultiTopicsConsumerImpl::topicNamesValid(const std::v
return topicNamePtr;
}

void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
callback(ResultOperationNotSupported);
}

void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
if (state_ != Ready) {
callback(ResultAlreadyClosed);
return;
}

void MultiTopicsConsumerImpl::beforeSeek() {
duringSeek_.store(true, std::memory_order_release);
consumers_.forEachValue([](const ConsumerImplPtr& consumer) { consumer->pauseMessageListener(); });
unAckedMessageTrackerPtr_->clear();
incomingMessages_.clear();
incomingMessagesSize_ = 0L;
}

void MultiTopicsConsumerImpl::afterSeek() {
duringSeek_.store(false, std::memory_order_release);
auto self = get_shared_this_ptr();
listenerExecutor_->postWork([this, self] {
consumers_.forEachValue([](const ConsumerImplPtr& consumer) { consumer->resumeMessageListener(); });
});
}

void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
if (msgId == MessageId::earliest() || msgId == MessageId::latest()) {
return seekAllAsync(msgId, callback);
}

auto optConsumer = consumers_.find(msgId.getTopicName());
if (!optConsumer) {
LOG_ERROR(getName() << "cannot seek a message id whose topic \"" + msgId.getTopicName() +
"\" is not subscribed");
callback(ResultOperationNotSupported);
shibd marked this conversation as resolved.
Show resolved Hide resolved
return;
}

beforeSeek();
auto weakSelf = weak_from_this();
auto numConsumersLeft = std::make_shared<std::atomic<int64_t>>(consumers_.size());
auto wrappedCallback = [this, weakSelf, callback, numConsumersLeft](Result result) {
optConsumer.get()->seekAsync(msgId, [this, weakSelf, callback](Result result) {
auto self = weakSelf.lock();
if (PULSAR_UNLIKELY(!self)) {
callback(result);
return;
}
if (result != ResultOk) {
*numConsumersLeft = 0; // skip the following callbacks
if (self) {
afterSeek();
callback(result);
return;
}
if (--*numConsumersLeft > 0) {
return;
} else {
callback(ResultAlreadyClosed);
}
duringSeek_.store(false, std::memory_order_release);
listenerExecutor_->postWork([this, self] {
consumers_.forEachValue(
[](const ConsumerImplPtr& consumer) { consumer->resumeMessageListener(); });
});
callback(ResultOk);
};
consumers_.forEachValue([timestamp, &wrappedCallback](const ConsumerImplPtr& consumer) {
consumer->seekAsync(timestamp, wrappedCallback);
});
}

void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
seekAllAsync(timestamp, callback);
}

void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
consumers_.forEachValue([enabled](const ConsumerImplPtr& consumer) {
consumer->setNegativeAcknowledgeEnabledForTesting(enabled);
Expand Down
52 changes: 48 additions & 4 deletions lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include <vector>

#include "Commands.h"
#include "ConsumerImplBase.h"
#include "ConsumerImpl.h"
#include "ConsumerInterceptors.h"
#include "Future.h"
#include "Latch.h"
Expand All @@ -38,7 +38,6 @@
namespace pulsar {
typedef std::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr;

class ConsumerImpl;
using ConsumerImplPtr = std::shared_ptr<ConsumerImpl>;
class ClientImpl;
using ClientImplPtr = std::shared_ptr<ClientImpl>;
Expand Down Expand Up @@ -152,8 +151,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
void handleSingleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
std::shared_ptr<std::atomic<int>> partitionsNeedCreate,
ConsumerSubResultPromisePtr topicSubResultPromise);
void handleUnsubscribedAsync(Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed,
ResultCallback callback);
void handleOneTopicUnsubscribedAsync(Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed,
int numberPartitions, TopicNamePtr topicNamePtr,
std::string& topicPartitionName, ResultCallback callback);
Expand All @@ -179,6 +176,16 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
return std::static_pointer_cast<MultiTopicsConsumerImpl>(shared_from_this());
}

template <typename SeekArg>
#if __cplusplus >= 202002L
requires std::convertible_to<SeekArg, uint64_t> ||
std::same_as<std::remove_cv_t<std::remove_reference_t<SeekArg>>, MessageId>
#endif
void seekAllAsync(const SeekArg& seekArg, ResultCallback callback);

void beforeSeek();
void afterSeek();

FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
Expand All @@ -187,5 +194,42 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
};

typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;

template <typename SeekArg>
#if __cplusplus >= 202002L
shibd marked this conversation as resolved.
Show resolved Hide resolved
requires std::convertible_to<SeekArg, uint64_t> ||
std::same_as<std::remove_cv_t<std::remove_reference_t<SeekArg>>, MessageId>
#endif
inline void MultiTopicsConsumerImpl::seekAllAsync(const SeekArg& seekArg, ResultCallback callback) {
if (state_ != Ready) {
callback(ResultAlreadyClosed);
return;
}
beforeSeek();
auto weakSelf = weak_from_this();
auto failed = std::make_shared<std::atomic_bool>(false);
consumers_.forEachValue(
[this, weakSelf, &seekArg, callback, failed](const ConsumerImplPtr& consumer, SharedFuture future) {
consumer->seekAsync(seekArg, [this, weakSelf, callback, failed, future](Result result) {
auto self = weakSelf.lock();
if (!self || failed->load(std::memory_order_acquire)) {
callback(result);
return;
}
if (result != ResultOk) {
failed->store(true, std::memory_order_release); // skip the following callbacks
afterSeek();
callback(result);
return;
}
if (future.tryComplete()) {
afterSeek();
callback(ResultOk);
}
});
},
[callback] { callback(ResultOk); });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are no consumers, it means no cursor will be reset. Maybe we cannot return a ResultOk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I agree with you but here it just keeps the same behavior with the Java client. See the original implementation from apache/pulsar#7518

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can start a discussion, or we should add a WARN log on here.

Otherwise, when something goes wrong, it's hard to troubleshoot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Java client does not have a warn log here as well. You can start a discussion in dev ML. But it should not block this PR because this PR keeps the same behavior with the Java client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shibd I make the seek call fail with ResultOperationNotSupported with a error log now. PTAL again.

The discussion mail: https://lists.apache.org/thread/qrwvl1zshmdohphjtdyp9v98hdngxb30

}

} // namespace pulsar
#endif // PULSAR_MULTI_TOPICS_CONSUMER_HEADER
67 changes: 63 additions & 4 deletions lib/SynchronizedHashMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,27 @@
*/
#pragma once

#include <atomic>
#include <boost/optional.hpp>
#include <functional>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <utility>
#include <vector>

namespace pulsar {

class SharedFuture {
public:
SharedFuture(size_t size) : count_(std::make_shared<std::atomic_size_t>(size)) {}

bool tryComplete() const { return --*count_ == 0; }

private:
std::shared_ptr<std::atomic_size_t> count_;
};

// V must be default constructible and copyable
template <typename K, typename V>
class SynchronizedHashMap {
Expand Down Expand Up @@ -60,10 +72,57 @@ class SynchronizedHashMap {
}
}

void forEachValue(std::function<void(const V&)> f) const {
Lock lock(mutex_);
for (const auto& kv : data_) {
f(kv.second);
template <typename ValueFunc>
#if __cplusplus >= 202002L
requires requires(ValueFunc&& each, const V& value) {
each(value);
}
#endif
void forEachValue(ValueFunc&& each) {
Lock lock{mutex_};
for (auto&& kv : data_) {
each(kv.second);
}
}

// This override provides a convenient approach to execute tasks on each consumer concurrently and
// supports checking if all tasks are done in the `each` callback.
//
// All map values will be passed as the 1st argument to the `each` function. The 2nd argument is a shared
// future whose `tryComplete` method marks this task as completed. If users want to check if all task are
// completed in the `each` function, this method must be called.
//
// For example, given a `SynchronizedHashMap<int, std::string>` object `m` and the following call:
//
// ```c++
// m.forEachValue([](const std::string& s, SharedFuture future) {
// std::cout << s << std::endl;
// if (future.tryComplete()) {
// std::cout << "done" << std::endl;
// }
// }, [] { std::cout << "empty map" << std::endl; });
// ```
//
// If the map is empty, only "empty map" will be printed. Otherwise, all values will be printed
// and "done" will be printed after that.
template <typename ValueFunc, typename EmptyFunc>
#if __cplusplus >= 202002L
requires requires(ValueFunc&& each, const V& value, SharedFuture count, EmptyFunc emptyFunc) {
each(value, count);
emptyFunc();
}
#endif
void forEachValue(ValueFunc&& each, EmptyFunc&& emptyFunc) {
shibd marked this conversation as resolved.
Show resolved Hide resolved
std::unique_lock<MutexType> lock{mutex_};
if (data_.empty()) {
lock.unlock();
emptyFunc();
return;
}
SharedFuture future{data_.size()};
for (auto&& kv : data_) {
const auto& value = kv.second;
each(value, future);
}
}

Expand Down
Loading
Loading