Skip to content

Commit

Permalink
[fix][test] Fix test
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Sep 9, 2024
1 parent 68f470d commit 05fe5c8
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2643,6 +2643,7 @@ public void testFailedUpdatePartitionedTopic() throws Exception {
URL pulsarUrl = new URL(pulsar.getWebServiceAddress());

admin.topics().createPartitionedTopic(partitionedTopicName, startPartitions);
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build();
Consumer<byte[]> consumer1 = client.newConsumer().topic(partitionedTopicName).subscriptionName(subName1)
.subscriptionType(SubscriptionType.Shared).subscribe();
Expand All @@ -2652,18 +2653,16 @@ public void testFailedUpdatePartitionedTopic() throws Exception {
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, startPartitions);

// create a subscription for few new partition which can fail
admin.topics().createSubscription(partitionedTopicName + "-partition-" + startPartitions, subName1,
MessageId.earliest);

try {
admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, false);
} catch (PulsarAdminException.PreconditionFailedException e) {
// Ok
admin.topics().createSubscription(partitionedTopicName + "-partition-" + startPartitions, subName1,
MessageId.earliest);
fail("Unexpected behaviour");
} catch (PulsarAdminException.ConflictException ex) {
// OK
}
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, startPartitions);

admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, true);
// validate subscription is created for new partition.
assertNotNull(admin.topics().getStats(partitionedTopicName + "-partition-" + 6).getSubscriptions().get(subName1));
for (int i = startPartitions; i < newPartitions; i++) {
assertNotNull(
admin.topics().getStats(partitionedTopicName + "-partition-" + i).getSubscriptions().get(subName1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,25 +150,26 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
.sendTimeout(1, TimeUnit.SECONDS)
.topic(topic)
.create();) {
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
} catch (PulsarClientException.TopicDoesNotExistException expected) {
// Since the "policies.deleted" is "true", the value of "isAllowAutoTopicCreationAsync" will be false,
// so the "TopicDoesNotExistException" is expected.
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic))
assertTrue(expected.getMessage().contains(topic)
|| expected.getMessage().contains(topicPoliciesServiceInitException));
}

try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe();) {
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
} catch (PulsarClientException.TopicDoesNotExistException expected) {
// Since the "policies.deleted" is "true", the value of "isAllowAutoTopicCreationAsync" will be false,
// so the "TopicDoesNotExistException" is expected.
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic))
assertTrue(expected.getMessage().contains(topic)
|| expected.getMessage().contains(topicPoliciesServiceInitException));
}


// verify that the topic does not exist
pulsar.getPulsarResources().getNamespaceResources()
.setPolicies(NamespaceName.get(namespaceName), old -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Policy.Expiration;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerBrokerUsageStats;
Expand All @@ -35,12 +33,9 @@
import org.apache.pulsar.broker.service.resource.usage.NetworkUsage;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public void testPersistentPartitionedTopicUnload() throws Exception {

assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName));
pulsar.getBrokerService().getTopicIfExists(topicName).get();
assertTrue(pulsar.getBrokerService().getTopics().containsKey(topicName));
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName));

// ref of partitioned-topic name should be empty
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
Expand Down

0 comments on commit 05fe5c8

Please sign in to comment.