Skip to content

Commit

Permalink
[fix] [broker] fix flaky test PatternTopicsConsumerImplTest (apache#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored Sep 23, 2023
1 parent be4ab66 commit be4bcac
Showing 1 changed file with 55 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand All @@ -67,6 +68,7 @@ public void setup() throws Exception {
isTcpLookup = true;
// enabled transaction, to test pattern consumers not subscribe to transaction system topic.
conf.setTransactionCoordinatorEnabled(true);
conf.setSubscriptionPatternMaxLength(10000);
super.internalSetup();
super.producerBaseSetup();
}
Expand Down Expand Up @@ -210,6 +212,12 @@ public void testBinaryProtoToGetTopicsOfNamespacePersistent() throws Exception {
.subscribe();
assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
Expand Down Expand Up @@ -287,6 +295,12 @@ public void testBinaryProtoSubscribeAllTopicOfNamespace() throws Exception {
.subscribe();
assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX));

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
Expand Down Expand Up @@ -364,6 +378,12 @@ public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exceptio
.subscriptionTopicsMode(RegexSubscriptionMode.NonPersistentOnly)
.subscribe();

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
Expand Down Expand Up @@ -455,6 +475,12 @@ public void testBinaryProtoToGetTopicsOfNamespaceAll() throws Exception {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.subscribe();

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

// 4. verify consumer get methods, to get right number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions();
Expand Down Expand Up @@ -525,6 +551,11 @@ public void testStartEmptyPatternConsumer() throws Exception {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

// 3. verify consumer get methods, to get 5 number of partitions and topics.
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
Expand Down Expand Up @@ -605,6 +636,12 @@ public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception
.receiverQueueSize(4)
.subscribe();

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

// 1. create partition
String topicName = "persistent://my-property/my-ns/pattern-topic-1-" + key;
TenantInfoImpl tenantInfo = createDefaultTenantInfo();
Expand Down Expand Up @@ -665,6 +702,12 @@ public void testAutoSubscribePatternConsumer() throws Exception {
.receiverQueueSize(4)
.subscribe();

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);

// 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3
Expand Down Expand Up @@ -775,6 +818,12 @@ public void testAutoUnsubscribePatternConsumer() throws Exception {
.receiverQueueSize(4)
.subscribe();

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);

// 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3
Expand Down Expand Up @@ -861,6 +910,12 @@ public void testTopicDeletion() throws Exception {
.subscriptionName("sub")
.subscribe();

// Wait topic list watcher creation.
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});

assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
PatternMultiTopicsConsumerImpl<String> consumerImpl = (PatternMultiTopicsConsumerImpl<String>) consumer;

Expand Down

0 comments on commit be4bcac

Please sign in to comment.