diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e4441969101c14..6a5d3cf7ab5836 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -367,21 +367,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS @Override public CompletableFuture initialize() { - List> futures = new ArrayList<>(); - futures.add(brokerService.getPulsar().newTopicCompactionService(topic).thenAccept(service -> { - PersistentTopic.this.topicCompactionService = service; - this.createPersistentSubscriptions(); - })); - - for (ManagedCursor cursor : ledger.getCursors()) { - if (cursor.getName().startsWith(replicatorPrefix)) { - String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); - String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName()); - futures.add(addReplicationCluster(remoteCluster, cursor, localCluster)); - } - } - return FutureUtil.waitForAll(futures).thenCompose(__ -> - brokerService.pulsar().getPulsarResources().getNamespaceResources() + return brokerService.pulsar().getPulsarResources().getNamespaceResources() .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) .thenAcceptAsync(optPolicies -> { if (!optPolicies.isPresent()) { @@ -409,13 +395,30 @@ public CompletableFuture initialize() { isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema; }, getOrderedExecutor()) - .thenCompose(ignore -> initTopicPolicy()) + .thenCompose(ignore -> initTopicPolicyAndApply()) .exceptionally(ex -> { log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, ex.getMessage()); isEncryptionRequired = false; return null; - })); + }) + .thenCompose(ignore -> { + List> futures = new ArrayList<>(); + for (ManagedCursor cursor : ledger.getCursors()) { + if (cursor.getName().startsWith(replicatorPrefix)) { + String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); + String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName()); + futures.add(addReplicationCluster(remoteCluster, cursor, localCluster)); + } + } + return FutureUtil.waitForAll(futures); + }) + .thenCompose(ignore -> brokerService.getPulsar().newTopicCompactionService(topic) + .thenAccept(service -> { + PersistentTopic.this.topicCompactionService = service; + this.createPersistentSubscriptions(); + }) + ); } // for testing purposes @@ -4025,16 +4028,19 @@ private void updateSubscriptionsDispatcherRateLimiter() { }); } - protected CompletableFuture initTopicPolicy() { + protected CompletableFuture initTopicPolicyAndApply() { if (brokerService.pulsar().getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) { brokerService.getPulsar().getTopicPoliciesService() .registerListener(TopicName.getPartitionedTopicName(topic), this); - return CompletableFuture.completedFuture(null).thenRunAsync(() -> onUpdate( - brokerService.getPulsar().getTopicPoliciesService() - .getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))), - brokerService.getTopicOrderedExecutor()); + TopicPolicies topicPolicies = brokerService.getPulsar().getTopicPoliciesService() + .getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic)); + if (topicPolicies != null) { + onUpdate(topicPolicies); + return CompletableFuture.completedFuture(null); + } } - return CompletableFuture.completedFuture(null); + + return FutureUtil.waitForAll(applyUpdatedTopicPolicies()); } @VisibleForTesting diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java index 747ef3b7f5ce85..4447234bf40aaa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java @@ -24,16 +24,18 @@ import static org.testng.AssertJUnit.assertFalse; import com.google.common.collect.Sets; import java.lang.reflect.Method; +import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; +import org.apache.pulsar.broker.qos.AsyncTokenBucket; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.policies.data.DispatchRate; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +82,47 @@ public Object[][] dispatchRateProvider() { return new Object[][] { { DispatchRateType.messageRate }, { DispatchRateType.byteRate } }; } + @Test + public void testReLoadReplicator() throws Exception { + final String namespace = "pulsar/" + System.currentTimeMillis(); + final String topicName = "persistent://" + namespace + "/" + System.currentTimeMillis(); + + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + admin1.namespaces().setReplicatorDispatchRate(namespace, + DispatchRate.builder().relativeToPublishRate(true) + .dispatchThrottlingRateInMsg(10) + .dispatchThrottlingRateInByte(1000) + .build()); + + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).build(); + + // Activate the replicator. + client1.newProducer().topic(topicName).create().close(); + Optional topicReference = pulsar1.getBrokerService().getTopicReference(topicName); + assertTrue(topicReference.isPresent()); + PersistentTopic persistentTopic = (PersistentTopic) topicReference.get(); + List replicators = persistentTopic.getReplicators().values(); + assertFalse(replicators.isEmpty()); + Replicator replicator = replicators.get(0); + assertTrue(replicator.getRateLimiter().isPresent()); + + // Reload the replicator. + // 1. Clean the topic from the broker cache + // 2. Create the producer, and then the replicator will be reloaded. + pulsar1.getBrokerService().removeTopicFromCache(topicReference.get()); + client1.newProducer().topic(topicName).create().close(); + topicReference = pulsar1.getBrokerService().getTopicReference(topicName); + assertTrue(topicReference.isPresent()); + + persistentTopic = (PersistentTopic) topicReference.get(); + replicators = persistentTopic.getReplicators().values(); + assertFalse(replicators.isEmpty()); + replicator = replicators.get(0); + assertTrue(replicator.getRateLimiter().isPresent()); + } + @Test public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception { cleanup();