diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index 7cd9da7574dbb..7a6189702dd8c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -20,7 +20,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -import java.lang.reflect.Field; import java.util.List; import java.util.Map; import java.util.Optional; @@ -29,14 +28,12 @@ import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; -import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.TopicPolicyListener; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; -import org.apache.pulsar.compaction.CompactionServiceFactory; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.testng.annotations.AfterClass; @@ -107,48 +104,4 @@ public void testNoOrphanTopicAfterCreateTimeout() throws Exception { admin.topics().delete(tpName, false); pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds); } - - @Test - public void testNoOrphanTopicIfInitFailed() throws Exception { - String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); - admin.topics().createNonPartitionedTopic(tpName); - - // Load topic. - Consumer consumer = pulsarClient.newConsumer() - .topic(tpName) - .subscriptionName("my-sub") - .subscribe(); - - // Make the method `PersitentTopic.initialize` fail. - Field fieldCompactionServiceFactory = PulsarService.class.getDeclaredField("compactionServiceFactory"); - fieldCompactionServiceFactory.setAccessible(true); - CompactionServiceFactory compactionServiceFactory = - (CompactionServiceFactory) fieldCompactionServiceFactory.get(pulsar); - fieldCompactionServiceFactory.set(pulsar, null); - admin.topics().unload(tpName); - - // Wait for failed to create topic for several times. - Thread.sleep(5 * 1000); - - // Remove the injected error, the topic will be created successful. - fieldCompactionServiceFactory.set(pulsar, compactionServiceFactory); - // We do not know the next time of consumer reconnection, so wait for 2 minutes to avoid flaky. It will be - // very fast in normal. - Awaitility.await().ignoreExceptions().atMost(120, TimeUnit.SECONDS).untilAsserted(() -> { - CompletableFuture> future = pulsar.getBrokerService().getTopic(tpName, false); - assertTrue(future.isDone()); - Optional optional = future.get(); - assertTrue(optional.isPresent()); - }); - - // Assert only one PersistentTopic was not closed. - TopicPoliciesService topicPoliciesService = pulsar.getTopicPoliciesService(); - Map>> listeners = - WhiteboxImpl.getInternalState(topicPoliciesService, "listeners"); - assertEquals(listeners.get(TopicName.get(tpName)).size(), 1); - - // cleanup. - consumer.close(); - admin.topics().delete(tpName, false); - } }