Skip to content

Commit

Permalink
[fix] [build] Remove test testNoOrphanTopicIfInitFailed (apache#21569)
Browse files Browse the repository at this point in the history
(cherry picked from commit 3875864)
  • Loading branch information
poorbarcode authored and mukesh-ctds committed Apr 15, 2024
1 parent 82f77bc commit 0799c8e
Showing 1 changed file with 0 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -29,14 +28,12 @@
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -107,48 +104,4 @@ public void testNoOrphanTopicAfterCreateTimeout() throws Exception {
admin.topics().delete(tpName, false);
pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds);
}

@Test
public void testNoOrphanTopicIfInitFailed() throws Exception {
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(tpName);

// Load topic.
Consumer consumer = pulsarClient.newConsumer()
.topic(tpName)
.subscriptionName("my-sub")
.subscribe();

// Make the method `PersitentTopic.initialize` fail.
Field fieldCompactionServiceFactory = PulsarService.class.getDeclaredField("compactionServiceFactory");
fieldCompactionServiceFactory.setAccessible(true);
CompactionServiceFactory compactionServiceFactory =
(CompactionServiceFactory) fieldCompactionServiceFactory.get(pulsar);
fieldCompactionServiceFactory.set(pulsar, null);
admin.topics().unload(tpName);

// Wait for failed to create topic for several times.
Thread.sleep(5 * 1000);

// Remove the injected error, the topic will be created successful.
fieldCompactionServiceFactory.set(pulsar, compactionServiceFactory);
// We do not know the next time of consumer reconnection, so wait for 2 minutes to avoid flaky. It will be
// very fast in normal.
Awaitility.await().ignoreExceptions().atMost(120, TimeUnit.SECONDS).untilAsserted(() -> {
CompletableFuture<Optional<Topic>> future = pulsar.getBrokerService().getTopic(tpName, false);
assertTrue(future.isDone());
Optional<Topic> optional = future.get();
assertTrue(optional.isPresent());
});

// Assert only one PersistentTopic was not closed.
TopicPoliciesService topicPoliciesService = pulsar.getTopicPoliciesService();
Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners =
WhiteboxImpl.getInternalState(topicPoliciesService, "listeners");
assertEquals(listeners.get(TopicName.get(tpName)).size(), 1);

// cleanup.
consumer.close();
admin.topics().delete(tpName, false);
}
}

0 comments on commit 0799c8e

Please sign in to comment.