Skip to content

Commit

Permalink
[fix] [broker] Topic can never be loaded up due to broker maintains a…
Browse files Browse the repository at this point in the history
… failed topic creation future (apache#23184)

(cherry picked from commit 9edaa85)
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
poorbarcode authored and nodece committed Sep 4, 2024
1 parent 9ed3e28 commit 3e63ef6
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,7 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean
topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
}
}).exceptionally(ex -> {
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(ex);
return null;
});
Expand Down Expand Up @@ -1650,10 +1651,16 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
if (topicFuture.isCompletedExceptionally()) {
if (!topicFuture.complete(Optional.of(persistentTopic))) {
// Check create persistent topic timeout.
log.warn("{} future is already completed with failure {}, closing the"
+ " topic", topic, FutureUtil.getException(topicFuture));
if (topicFuture.isCompletedExceptionally()) {
log.warn("{} future is already completed with failure {}, closing"
+ " the topic", topic, FutureUtil.getException(topicFuture));
} else {
// It should not happen.
log.error("{} future is already completed by another thread, "
+ "which is not expected. Closing the current one", topic);
}
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, ex) -> {
topics.remove(topic, topicFuture);
Expand All @@ -1665,7 +1672,6 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
});
} else {
addTopicToStatsMaps(topicName, persistentTopic);
topicFuture.complete(Optional.of(persistentTopic));
}
})
.exceptionally((ex) -> {
Expand Down Expand Up @@ -1694,6 +1700,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
if (!createIfMissing && exception instanceof ManagedLedgerNotFoundException) {
// We were just trying to load a topic and the topic doesn't exist
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
loadFuture.completeExceptionally(exception);
topicFuture.complete(Optional.empty());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@
package org.apache.pulsar.client.api;

import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
Expand All @@ -40,10 +45,12 @@
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -182,4 +189,93 @@ public CompletableFuture<Void> closeAsync() {
pulsar.getConfig().setBrokerDeduplicationEnabled(false);
pulsar.getConfig().setTransactionCoordinatorEnabled(false);
}

@DataProvider(name = "whetherTimeoutOrNot")
public Object[][] whetherTimeoutOrNot() {
return new Object[][] {
{true},
{false}
};
}

@Test(timeOut = 60 * 1000, dataProvider = "whetherTimeoutOrNot")
public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception {
if (injectTimeout) {
pulsar.getConfig().setTopicLoadTimeoutSeconds(5);
}
String ns = "public" + "/" + UUID.randomUUID().toString().replaceAll("-", "");
String tpName = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp");
admin.namespaces().createNamespace(ns);
admin.topics().createNonPartitionedTopic(tpName);
admin.namespaces().unload(ns);

// Inject an error when calling "NamespaceService.isServiceUnitActiveAsync".
AtomicInteger failedTimes = new AtomicInteger();
NamespaceService namespaceService = pulsar.getNamespaceService();
doAnswer(invocation -> {
TopicName paramTp = (TopicName) invocation.getArguments()[0];
if (paramTp.toString().equalsIgnoreCase(tpName) && failedTimes.incrementAndGet() <= 2) {
if (injectTimeout) {
Thread.sleep(10 * 1000);
}
log.info("Failed {} times", failedTimes.get());
return FutureUtil.failedFuture(new RuntimeException("mocked error"));
}
return invocation.callRealMethod();
}).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class));

// Verify: the consumer can create successfully eventually.
Consumer consumer = pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe();

// cleanup.
if (injectTimeout) {
pulsar.getConfig().setTopicLoadTimeoutSeconds(60);
}
consumer.close();
admin.topics().delete(tpName);
}

@Test(timeOut = 60 * 1000, dataProvider = "whetherTimeoutOrNot")
public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) throws Exception {
if (injectTimeout) {
pulsar.getConfig().setTopicLoadTimeoutSeconds(5);
}
String ns = "public" + "/" + UUID.randomUUID().toString().replaceAll("-", "");
String tpName = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp");
admin.namespaces().createNamespace(ns);
admin.topics().createNonPartitionedTopic(tpName);
admin.namespaces().unload(ns);

// Inject a race condition: load topic and delete topic execute at the same time.
AtomicInteger mockRaceConditionCounter = new AtomicInteger();
NamespaceService namespaceService = pulsar.getNamespaceService();
doAnswer(invocation -> {
TopicName paramTp = (TopicName) invocation.getArguments()[0];
if (paramTp.toString().equalsIgnoreCase(tpName) && mockRaceConditionCounter.incrementAndGet() <= 1) {
if (injectTimeout) {
Thread.sleep(10 * 1000);
}
log.info("Race condition occurs {} times", mockRaceConditionCounter.get());
pulsar.getManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding());
}
return invocation.callRealMethod();
}).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class));

// Verify: the consumer create failed due to pulsar does not allow to create topic automatically.
try {
pulsar.getBrokerService().getTopic(tpName, false, Collections.emptyMap()).join();
} catch (Exception ex) {
log.warn("Expected error", ex);
}

// Verify: the consumer create successfully after allowing to create topic automatically.
Consumer consumer = pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe();

// cleanup.
if (injectTimeout) {
pulsar.getConfig().setTopicLoadTimeoutSeconds(60);
}
consumer.close();
admin.topics().delete(tpName);
}
}

0 comments on commit 3e63ef6

Please sign in to comment.