Skip to content

Commit

Permalink
[fix][test] Fix ExtensibleLoadManagerImplTest flaky test (apache#21479)
Browse files Browse the repository at this point in the history
  • Loading branch information
Demogorgon314 authored Nov 7, 2023
1 parent 44abba9 commit 469ce7e
Showing 1 changed file with 45 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand Down Expand Up @@ -105,6 +106,7 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
Expand Down Expand Up @@ -194,7 +196,7 @@ public void setup() throws Exception {
admin.namespaces().setNamespaceReplicationClusters("public/default",
Sets.newHashSet(this.conf.getClusterName()));

admin.namespaces().createNamespace(defaultTestNamespace);
admin.namespaces().createNamespace(defaultTestNamespace, 128);
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
Sets.newHashSet(this.conf.getClusterName()));
lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
Expand Down Expand Up @@ -237,8 +239,9 @@ public void testAssignInternalTopic() throws Exception {

@Test
public void testAssign() throws Exception {
TopicName topicName = TopicName.get(defaultTestNamespace + "/test-assign");
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
Pair<TopicName, NamespaceBundle> topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-assign");
TopicName topicName = topicAndBundle.getLeft();
NamespaceBundle bundle = topicAndBundle.getRight();
Optional<BrokerLookupData> brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get();
assertTrue(brokerLookupData.isPresent());
log.info("Assign the bundle {} to {}", bundle, brokerLookupData);
Expand All @@ -262,8 +265,8 @@ public void testAssign() throws Exception {

@Test
public void testCheckOwnershipAsync() throws Exception {
TopicName topicName = TopicName.get(defaultTestNamespace + "/test-check-ownership");
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
Pair<TopicName, NamespaceBundle> topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-check-ownership");
NamespaceBundle bundle = topicAndBundle.getRight();
// 1. The bundle is never assigned.
retryStrategically((test) -> {
try {
Expand Down Expand Up @@ -291,8 +294,8 @@ public void testCheckOwnershipAsync() throws Exception {

@Test
public void testFilter() throws Exception {
TopicName topicName = TopicName.get(defaultTestNamespace + "/test-filter");
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
Pair<TopicName, NamespaceBundle> topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-filter");
NamespaceBundle bundle = topicAndBundle.getRight();

doReturn(List.of(new BrokerFilter() {
@Override
Expand All @@ -317,8 +320,8 @@ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String,

@Test
public void testFilterHasException() throws Exception {
TopicName topicName = TopicName.get(defaultTestNamespace + "/test-filter-has-exception");
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
Pair<TopicName, NamespaceBundle> topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-filter-has-exception");
NamespaceBundle bundle = topicAndBundle.getRight();

doReturn(List.of(new MockBrokerFilter() {
@Override
Expand All @@ -336,8 +339,9 @@ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String,

@Test(timeOut = 30 * 1000)
public void testUnloadAdminAPI() throws Exception {
TopicName topicName = TopicName.get(defaultTestNamespace + "/test-unload");
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
Pair<TopicName, NamespaceBundle> topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-unload");
TopicName topicName = topicAndBundle.getLeft();
NamespaceBundle bundle = topicAndBundle.getRight();

AtomicInteger onloadCount = new AtomicInteger(0);
AtomicInteger unloadCount = new AtomicInteger(0);
Expand Down Expand Up @@ -534,8 +538,9 @@ private void checkOwnershipState(String broker, NamespaceBundle bundle)
@Test(timeOut = 30 * 1000)
public void testSplitBundleAdminAPI() throws Exception {
String namespace = defaultTestNamespace;
String topic = "persistent://" + namespace + "/test-split";
admin.topics().createPartitionedTopic(topic, 10);
Pair<TopicName, NamespaceBundle> topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-split");
TopicName topicName = topicAndBundle.getLeft();
admin.topics().createPartitionedTopic(topicName.toString(), 10);
BundlesData bundles = admin.namespaces().getBundles(namespace);
int numBundles = bundles.getNumBundles();
var bundleRanges = bundles.getBoundaries().stream().map(Long::decode).sorted().toList();
Expand Down Expand Up @@ -586,7 +591,7 @@ public boolean test(NamespaceBundle namespaceBundle) {
public void testSplitBundleWithSpecificPositionAdminAPI() throws Exception {
String namespace = defaultTestNamespace;
String topic = "persistent://" + namespace + "/test-split-with-specific-position";
admin.topics().createPartitionedTopic(topic, 10);
admin.topics().createPartitionedTopic(topic, 1024);
BundlesData bundles = admin.namespaces().getBundles(namespace);
int numBundles = bundles.getNumBundles();

Expand Down Expand Up @@ -664,9 +669,11 @@ public void testCheckOwnershipPresentWithSystemNamespace() throws Exception {
public void testMoreThenOneFilter() throws Exception {
// Use a different namespace to avoid flaky test failures
// from unloading the default namespace and the following topic policy lookups at the init state step
String namespace = "public/my-namespace";
TopicName topicName = TopicName.get(namespace + "/test-filter-has-exception");
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
Pair<TopicName, NamespaceBundle> topicAndBundle =
getBundleIsNotOwnByChangeEventTopic("test-filter-has-exception");
TopicName topicName = topicAndBundle.getLeft();
NamespaceBundle bundle = topicAndBundle.getRight();

String lookupServiceAddress1 = pulsar1.getLookupServiceAddress();
doReturn(List.of(new MockBrokerFilter() {
@Override
Expand All @@ -684,7 +691,6 @@ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String,
return FutureUtil.failedFuture(new BrokerFilterException("Test"));
}
})).when(primaryLoadManager).getBrokerFilterPipeline();
admin.namespaces().createNamespace(namespace);
Optional<BrokerLookupData> brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get();
Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
assertTrue(brokerLookupData.isPresent());
Expand All @@ -694,8 +700,6 @@ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String,
assertEquals(brokerLookupData.get().getPulsarServiceUrl(),
pulsar2.getAdminClient().lookups().lookupTopic(topicName.toString()));
});

admin.namespaces().deleteNamespace(namespace, true);
}

@Test
Expand All @@ -718,7 +722,11 @@ public void testDeployAndRollbackLoadManager() throws Exception {
try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) {
// start pulsar3 with old load manager
var pulsar3 = additionalPulsarTestContext.getPulsarService();
String topic = "persistent://" + defaultTestNamespace + "/test";
Pair<TopicName, NamespaceBundle> topicAndBundle =
getBundleIsNotOwnByChangeEventTopic("testDeployAndRollbackLoadManager");
TopicName topicName = topicAndBundle.getLeft();
NamespaceBundle bundle = topicAndBundle.getRight();
String topic = topicName.toString();

String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic);
assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl());
Expand All @@ -728,7 +736,6 @@ public void testDeployAndRollbackLoadManager() throws Exception {
assertEquals(lookupResult1, lookupResult2);
assertEquals(lookupResult1, lookupResult3);

NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get();
LookupOptions options = LookupOptions.builder()
.authoritative(false)
.requestHttps(false)
Expand Down Expand Up @@ -1400,4 +1407,20 @@ private void setSecondaryLoadManager() throws IllegalAccessException {
private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) {
return pulsar.getNamespaceService().getBundleAsync(topic);
}

private Pair<TopicName, NamespaceBundle> getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
throws Exception {
TopicName changeEventsTopicName =
TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get();
int i = 0;
while(true) {
TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i);
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
if (!bundle.equals(changeEventsBundle)) {
return Pair.of(topicName, bundle);
}
i++;
}
}
}

0 comments on commit 469ce7e

Please sign in to comment.