Skip to content

Commit

Permalink
[fix] [client] Messages lost due to TopicListWatcher reconnect (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored Jan 8, 2024
1 parent aae0e9d commit 042e769
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -708,5 +708,14 @@ public static class ServiceProducer {
private PersistentTopic persistentTopic;
}

protected void sleepSeconds(int seconds){
try {
Thread.sleep(1000 * seconds);
} catch (InterruptedException e) {
log.warn("This thread has been interrupted", e);
Thread.currentThread().interrupt();
}
}

private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,18 @@
import io.netty.util.Timeout;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
Expand All @@ -53,6 +57,7 @@
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker-impl")
Expand Down Expand Up @@ -620,13 +625,28 @@ public void testStartEmptyPatternConsumer() throws Exception {
producer3.close();
}

@Test(timeOut = testTimeout)
public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception {
String key = "AutoSubscribePatternConsumer";
String subscriptionName = "my-ex-subscription-" + key;
@DataProvider(name= "delayTypesOfWatchingTopics")
public Object[][] delayTypesOfWatchingTopics(){
return new Object[][]{
{true},
{false}
};
}

Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
Consumer<byte[]> consumer = pulsarClient.newConsumer()
@Test(timeOut = testTimeout, dataProvider = "delayTypesOfWatchingTopics")
public void testAutoSubscribePatterConsumerFromBrokerWatcher(boolean delayWatchingTopics) throws Exception {
final String key = "AutoSubscribePatternConsumer";
final String subscriptionName = "my-ex-subscription-" + key;
final Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");

PulsarClient client = null;
if (delayWatchingTopics) {
client = createDelayWatchTopicsClient();
} else {
client = pulsarClient;
}

Consumer<byte[]> consumer = client.newConsumer()
.topicsPattern(pattern)
// Disable automatic discovery.
.patternAutoDiscoveryPeriod(1000)
Expand All @@ -636,12 +656,6 @@ public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception
.receiverQueueSize(4)
.subscribe();

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

// 1. create partition
String topicName = "persistent://my-property/my-ns/pattern-topic-1-" + key;
TenantInfoImpl tenantInfo = createDefaultTenantInfo();
Expand All @@ -657,7 +671,35 @@ public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 1);
});

// cleanup.
consumer.close();
admin.topics().deletePartitionedTopic(topicName);
if (delayWatchingTopics) {
client.close();
}
}

private PulsarClient createDelayWatchTopicsClient() throws Exception {
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
return InjectedClientCnxClientBuilder.create(clientBuilder,
(conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) {
public CompletableFuture<CommandWatchTopicListSuccess> newWatchTopicList(
BaseCommand command, long requestId) {
// Inject 2 seconds delay when sending command New Watch Topics.
CompletableFuture<CommandWatchTopicListSuccess> res = new CompletableFuture<>();
new Thread(() -> {
sleepSeconds(2);
super.newWatchTopicList(command, requestId).whenComplete((v, ex) -> {
if (ex != null) {
res.completeExceptionally(ex);
} else {
res.complete(v);
}
});
}).start();
return res;
}
});
}

// simulate subscribe a pattern which has 3 topics, but then matched topic added in.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
Expand All @@ -50,8 +51,19 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
private final Pattern topicsPattern;
private final TopicsChangedListener topicsChangeListener;
private final Mode subscriptionMode;
private final CompletableFuture<TopicListWatcher> watcherFuture;
private final CompletableFuture<TopicListWatcher> watcherFuture = new CompletableFuture<>();
protected NamespaceName namespaceName;

/**
* There is two task to re-check topic changes, the both tasks will not be take affects at the same time.
* 1. {@link #recheckTopicsChangeAfterReconnect}: it will be called after the {@link TopicListWatcher} reconnected
* if you enabled {@link TopicListWatcher}. This backoff used to do a retry if
* {@link #recheckTopicsChangeAfterReconnect} is failed.
* 2. {@link #run} A scheduled task to trigger re-check topic changes, it will be used if you disabled
* {@link TopicListWatcher}.
*/
private final Backoff recheckPatternTaskBackoff;
private final AtomicInteger recheckPatternEpoch = new AtomicInteger();
private volatile Timeout recheckPatternTimeout = null;
private volatile String topicsHash;

Expand All @@ -69,6 +81,11 @@ public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
this.topicsPattern = topicsPattern;
this.topicsHash = topicsHash;
this.subscriptionMode = subscriptionMode;
this.recheckPatternTaskBackoff = new BackoffBuilder()
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMandatoryStop(0, TimeUnit.SECONDS)
.create();

if (this.namespaceName == null) {
this.namespaceName = getNameSpaceFromPattern(topicsPattern);
Expand All @@ -78,11 +95,10 @@ public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
this.topicsChangeListener = new PatternTopicsChangedListener();
this.recheckPatternTimeout = client.timer()
.newTimeout(this, Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
this.watcherFuture = new CompletableFuture<>();
if (subscriptionMode == Mode.PERSISTENT) {
long watcherId = client.newTopicListWatcherId();
new TopicListWatcher(topicsChangeListener, client, topicsPattern, watcherId,
namespaceName, topicsHash, watcherFuture);
namespaceName, topicsHash, watcherFuture, () -> recheckTopicsChangeAfterReconnect());
watcherFuture
.thenAccept(__ -> recheckPatternTimeout.cancel())
.exceptionally(ex -> {
Expand All @@ -99,40 +115,75 @@ public static NamespaceName getNameSpaceFromPattern(Pattern pattern) {
return TopicName.get(pattern.pattern()).getNamespaceObject();
}

/**
* This method will be called after the {@link TopicListWatcher} reconnected after enabled {@link TopicListWatcher}.
*/
private void recheckTopicsChangeAfterReconnect() {
// Skip if closed or the task has been cancelled.
if (getState() == State.Closing || getState() == State.Closed) {
return;
}
// Do check.
recheckTopicsChange().whenComplete((ignore, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage());
long delayMs = recheckPatternTaskBackoff.next();
client.timer().newTimeout(timeout -> {
recheckTopicsChangeAfterReconnect();
}, delayMs, TimeUnit.MILLISECONDS);
} else {
recheckPatternTaskBackoff.reset();
}
});
}

// TimerTask to recheck topics change, and trigger subscribe/unsubscribe based on the change.
@Override
public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
return;
}
client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode, topicsPattern.pattern(), topicsHash)
.thenCompose(getTopicsResult -> {
recheckTopicsChange().exceptionally(ex -> {
log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage());
return null;
}).thenAccept(__ -> {
// schedule the next re-check task
this.recheckPatternTimeout = client.timer()
.newTimeout(PatternMultiTopicsConsumerImpl.this,
Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
});
}

if (log.isDebugEnabled()) {
log.debug("Get topics under namespace {}, topics.size: {}, topicsHash: {}, filtered: {}",
namespaceName, getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(),
getTopicsResult.isFiltered());
getTopicsResult.getTopics().forEach(topicName ->
log.debug("Get topics under namespace {}, topic: {}", namespaceName, topicName));
}
private CompletableFuture<Void> recheckTopicsChange() {
String pattern = topicsPattern.pattern();
final int epoch = recheckPatternEpoch.incrementAndGet();
return client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode, pattern, topicsHash)
.thenCompose(getTopicsResult -> {
// If "recheckTopicsChange" has been called more than one times, only make the last one take affects.
// Use "synchronized (recheckPatternTaskBackoff)" instead of
// `synchronized(PatternMultiTopicsConsumerImpl.this)` to avoid locking in a wider range.
synchronized (recheckPatternTaskBackoff) {
if (recheckPatternEpoch.get() > epoch) {
return CompletableFuture.completedFuture(null);
}
if (log.isDebugEnabled()) {
log.debug("Get topics under namespace {}, topics.size: {}, topicsHash: {}, filtered: {}",
namespaceName, getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(),
getTopicsResult.isFiltered());
getTopicsResult.getTopics().forEach(topicName ->
log.debug("Get topics under namespace {}, topic: {}", namespaceName, topicName));
}

final List<String> oldTopics = new ArrayList<>(getPartitionedTopics());
for (String partition : getPartitions()) {
TopicName topicName = TopicName.get(partition);
if (!topicName.isPartitioned() || !oldTopics.contains(topicName.getPartitionedTopicName())) {
oldTopics.add(partition);
final List<String> oldTopics = new ArrayList<>(getPartitionedTopics());
for (String partition : getPartitions()) {
TopicName topicName = TopicName.get(partition);
if (!topicName.isPartitioned() || !oldTopics.contains(topicName.getPartitionedTopicName())) {
oldTopics.add(partition);
}
}
return updateSubscriptions(topicsPattern, this::setTopicsHash, getTopicsResult,
topicsChangeListener, oldTopics);
}
return updateSubscriptions(topicsPattern, this::setTopicsHash, getTopicsResult,
topicsChangeListener, oldTopics);
}).exceptionally(ex -> {
log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage());
return null;
}).thenAccept(__ -> {
// schedule the next re-check task
this.recheckPatternTimeout = client.timer()
.newTimeout(PatternMultiTopicsConsumerImpl.this,
Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler.
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<>();
private final AtomicReference<ClientCnx> clientCnxUsedForWatcherRegistration = new AtomicReference<>();

private final Runnable recheckTopicsChangeAfterReconnect;


public TopicListWatcher(PatternMultiTopicsConsumerImpl.TopicsChangedListener topicsChangeListener,
PulsarClientImpl client, Pattern topicsPattern, long watcherId,
NamespaceName namespace, String topicsHash,
CompletableFuture<TopicListWatcher> watcherFuture) {
CompletableFuture<TopicListWatcher> watcherFuture,
Runnable recheckTopicsChangeAfterReconnect) {
super(client, topicsPattern.pattern());
this.topicsChangeListener = topicsChangeListener;
this.name = "Watcher(" + topicsPattern + ")";
Expand All @@ -77,6 +80,7 @@ public TopicListWatcher(PatternMultiTopicsConsumerImpl.TopicsChangedListener top
this.namespace = namespace;
this.topicsHash = topicsHash;
this.watcherFuture = watcherFuture;
this.recheckTopicsChangeAfterReconnect = recheckTopicsChangeAfterReconnect;

connectionHandler.grabCnx();
}
Expand Down Expand Up @@ -141,6 +145,7 @@ public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {

this.connectionHandler.resetBackoff();

recheckTopicsChangeAfterReconnect.run();
watcherFuture.complete(this);
future.complete(null);
}).exceptionally((e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void setup() {
watcherFuture = new CompletableFuture<>();
watcher = new TopicListWatcher(listener, client,
Pattern.compile(topic), 7,
NamespaceName.get("tenant/ns"), null, watcherFuture);
NamespaceName.get("tenant/ns"), null, watcherFuture, () -> {});
}

@Test
Expand Down

0 comments on commit 042e769

Please sign in to comment.