diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 0f8b2c918640f..4bfb9ee567920 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1556,6 +1556,7 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); } }).exceptionally(ex -> { + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(ex); return null; }); @@ -1650,10 +1651,16 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs; pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); - if (topicFuture.isCompletedExceptionally()) { + if (!topicFuture.complete(Optional.of(persistentTopic))) { // Check create persistent topic timeout. - log.warn("{} future is already completed with failure {}, closing the" - + " topic", topic, FutureUtil.getException(topicFuture)); + if (topicFuture.isCompletedExceptionally()) { + log.warn("{} future is already completed with failure {}, closing" + + " the topic", topic, FutureUtil.getException(topicFuture)); + } else { + // It should not happen. + log.error("{} future is already completed by another thread, " + + "which is not expected. Closing the current one", topic); + } executor().submit(() -> { persistentTopic.close().whenComplete((ignore, ex) -> { topics.remove(topic, topicFuture); @@ -1665,7 +1672,6 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { }); } else { addTopicToStatsMaps(topicName, persistentTopic); - topicFuture.complete(Optional.of(persistentTopic)); } }) .exceptionally((ex) -> { @@ -1694,6 +1700,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { if (!createIfMissing && exception instanceof ManagedLedgerNotFoundException) { // We were just trying to load a topic and the topic doesn't exist + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); loadFuture.completeExceptionally(exception); topicFuture.complete(Optional.empty()); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index 7de9d07d328f4..51b65726995ee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -19,18 +19,23 @@ package org.apache.pulsar.client.api; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; @@ -40,10 +45,12 @@ import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -182,4 +189,93 @@ public CompletableFuture closeAsync() { pulsar.getConfig().setBrokerDeduplicationEnabled(false); pulsar.getConfig().setTransactionCoordinatorEnabled(false); } + + @DataProvider(name = "whetherTimeoutOrNot") + public Object[][] whetherTimeoutOrNot() { + return new Object[][] { + {true}, + {false} + }; + } + + @Test(timeOut = 60 * 1000, dataProvider = "whetherTimeoutOrNot") + public void testCheckOwnerShipFails(boolean injectTimeout) throws Exception { + if (injectTimeout) { + pulsar.getConfig().setTopicLoadTimeoutSeconds(5); + } + String ns = "public" + "/" + UUID.randomUUID().toString().replaceAll("-", ""); + String tpName = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp"); + admin.namespaces().createNamespace(ns); + admin.topics().createNonPartitionedTopic(tpName); + admin.namespaces().unload(ns); + + // Inject an error when calling "NamespaceService.isServiceUnitActiveAsync". + AtomicInteger failedTimes = new AtomicInteger(); + NamespaceService namespaceService = pulsar.getNamespaceService(); + doAnswer(invocation -> { + TopicName paramTp = (TopicName) invocation.getArguments()[0]; + if (paramTp.toString().equalsIgnoreCase(tpName) && failedTimes.incrementAndGet() <= 2) { + if (injectTimeout) { + Thread.sleep(10 * 1000); + } + log.info("Failed {} times", failedTimes.get()); + return FutureUtil.failedFuture(new RuntimeException("mocked error")); + } + return invocation.callRealMethod(); + }).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class)); + + // Verify: the consumer can create successfully eventually. + Consumer consumer = pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe(); + + // cleanup. + if (injectTimeout) { + pulsar.getConfig().setTopicLoadTimeoutSeconds(60); + } + consumer.close(); + admin.topics().delete(tpName); + } + + @Test(timeOut = 60 * 1000, dataProvider = "whetherTimeoutOrNot") + public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout) throws Exception { + if (injectTimeout) { + pulsar.getConfig().setTopicLoadTimeoutSeconds(5); + } + String ns = "public" + "/" + UUID.randomUUID().toString().replaceAll("-", ""); + String tpName = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp"); + admin.namespaces().createNamespace(ns); + admin.topics().createNonPartitionedTopic(tpName); + admin.namespaces().unload(ns); + + // Inject a race condition: load topic and delete topic execute at the same time. + AtomicInteger mockRaceConditionCounter = new AtomicInteger(); + NamespaceService namespaceService = pulsar.getNamespaceService(); + doAnswer(invocation -> { + TopicName paramTp = (TopicName) invocation.getArguments()[0]; + if (paramTp.toString().equalsIgnoreCase(tpName) && mockRaceConditionCounter.incrementAndGet() <= 1) { + if (injectTimeout) { + Thread.sleep(10 * 1000); + } + log.info("Race condition occurs {} times", mockRaceConditionCounter.get()); + pulsar.getManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding()); + } + return invocation.callRealMethod(); + }).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class)); + + // Verify: the consumer create failed due to pulsar does not allow to create topic automatically. + try { + pulsar.getBrokerService().getTopic(tpName, false, Collections.emptyMap()).join(); + } catch (Exception ex) { + log.warn("Expected error", ex); + } + + // Verify: the consumer create successfully after allowing to create topic automatically. + Consumer consumer = pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe(); + + // cleanup. + if (injectTimeout) { + pulsar.getConfig().setTopicLoadTimeoutSeconds(60); + } + consumer.close(); + admin.topics().delete(tpName); + } }