From 131af38d1790ea3e4c6fad1235c8c071db7f692d Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Mon, 15 Apr 2024 18:35:18 +0800 Subject: [PATCH] [fix][broker] Fix the applying of namespace policies Signed-off-by: Zixuan Liu --- .../pulsar/broker/service/AbstractTopic.java | 20 +++- .../service/persistent/PersistentTopic.java | 96 ++++++++----------- .../service/ReplicatorRateLimiterTest.java | 45 ++++++++- .../policies/data/HierarchyTopicPolicies.java | 6 ++ 4 files changed, 105 insertions(+), 62 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 05defa60c050bf..5b66afbb4558ec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -308,6 +308,9 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().updateNamespaceValue( namespacePolicies.dispatcherPauseOnAckStatePersistentEnabled); + topicPolicies.getResourceGroup().updateNamespaceValue(namespacePolicies.resource_group_name); + topicPolicies.getEncryptionRequired().updateNamespaceValue(namespacePolicies.encryption_required); + topicPolicies.getAllowAutoUpdateSchema().updateNamespaceValue(namespacePolicies.is_allow_auto_update_schema); updateEntryFilters(); } @@ -1092,7 +1095,7 @@ public PublishRateLimiter getBrokerPublishRateLimiter() { /** * @deprecated Avoid using the deprecated method * #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and we can use - * #{@link AbstractTopic#updateResourceGroupLimiter(Policies)} to instead of it. + * #{@link AbstractTopic#updateResourceGroupLimiter()} to instead of it. */ @Deprecated public void updateResourceGroupLimiter(Optional optPolicies) { @@ -1108,13 +1111,24 @@ public void updateResourceGroupLimiter(Optional optPolicies) { log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); policies = new Policies(); } - updateResourceGroupLimiter(policies); + updateResourceGroupLimiter(policies.resource_group_name); } + /** + * @deprecated Use {@link #updateResourceGroupLimiter()} instead. + */ + @Deprecated public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) { requireNonNull(namespacePolicies); + updateResourceGroupLimiter(namespacePolicies.resource_group_name); + } + + protected void updateResourceGroupLimiter() { + updateResourceGroupLimiter(this.topicPolicies.getResourceGroup().get()); + } + + private void updateResourceGroupLimiter(String rgName) { // attach the resource-group level rate limiters, if set - String rgName = namespacePolicies.resource_group_name; if (rgName != null) { final ResourceGroup resourceGroup = brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e4441969101c14..a5de0b452e0ad0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -367,55 +367,34 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS @Override public CompletableFuture initialize() { - List> futures = new ArrayList<>(); - futures.add(brokerService.getPulsar().newTopicCompactionService(topic).thenAccept(service -> { - PersistentTopic.this.topicCompactionService = service; - this.createPersistentSubscriptions(); - })); - - for (ManagedCursor cursor : ledger.getCursors()) { - if (cursor.getName().startsWith(replicatorPrefix)) { - String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); - String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName()); - futures.add(addReplicationCluster(remoteCluster, cursor, localCluster)); - } - } - return FutureUtil.waitForAll(futures).thenCompose(__ -> - brokerService.pulsar().getPulsarResources().getNamespaceResources() + return brokerService.pulsar().getPulsarResources().getNamespaceResources() .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) - .thenAcceptAsync(optPolicies -> { - if (!optPolicies.isPresent()) { - isEncryptionRequired = false; - updatePublishRateLimiter(); - updateResourceGroupLimiter(new Policies()); - initializeDispatchRateLimiterIfNeeded(); - updateSubscribeRateLimiter(); - return; - } - - Policies policies = optPolicies.get(); - - this.updateTopicPolicyByNamespacePolicy(policies); - - initializeDispatchRateLimiterIfNeeded(); - - updateSubscribeRateLimiter(); - - updatePublishRateLimiter(); - - updateResourceGroupLimiter(policies); - - this.isEncryptionRequired = policies.encryption_required; - - isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema; - }, getOrderedExecutor()) - .thenCompose(ignore -> initTopicPolicy()) + .thenAcceptAsync(optPolicies -> optPolicies.ifPresent(this::updateTopicPolicyByNamespacePolicy), + getOrderedExecutor()) + .thenCompose(ignore -> initTopicPolicyAndApply()) .exceptionally(ex -> { log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, ex.getMessage()); isEncryptionRequired = false; return null; - })); + }) + .thenCompose(ignore -> { + List> futures = new ArrayList<>(); + for (ManagedCursor cursor : ledger.getCursors()) { + if (cursor.getName().startsWith(replicatorPrefix)) { + String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); + String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName()); + futures.add(addReplicationCluster(remoteCluster, cursor, localCluster)); + } + } + return FutureUtil.waitForAll(futures); + }) + .thenCompose(ignore -> brokerService.getPulsar().newTopicCompactionService(topic) + .thenAccept(service -> { + PersistentTopic.this.topicCompactionService = service; + this.createPersistentSubscriptions(); + }) + ); } // for testing purposes @@ -3150,13 +3129,9 @@ public CompletableFuture onPoliciesUpdate(@Nonnull Policies data) { // see more detail: https://github.com/apache/pulsar/pull/19364. updateTopicPolicyByNamespacePolicy(data); checkReplicatedSubscriptionControllerState(); - isEncryptionRequired = data.encryption_required; - isAllowAutoUpdateSchema = data.is_allow_auto_update_schema; // Apply policies for components. - List> applyPolicyTasks = applyUpdatedTopicPolicies(); - applyPolicyTasks.add(applyUpdatedNamespacePolicies(data)); - return FutureUtil.waitForAll(applyPolicyTasks) + return FutureUtil.waitForAll(applyUpdatedTopicPolicies()) .thenAccept(__ -> log.info("[{}] namespace-level policies updated successfully", topic)) .exceptionally(ex -> { log.error("[{}] update namespace polices : {} error", this.getName(), data, ex); @@ -3164,13 +3139,14 @@ public CompletableFuture onPoliciesUpdate(@Nonnull Policies data) { }); } - private CompletableFuture applyUpdatedNamespacePolicies(Policies namespaceLevelPolicies) { - return FutureUtil.runWithCurrentThread(() -> updateResourceGroupLimiter(namespaceLevelPolicies)); - } - private List> applyUpdatedTopicPolicies() { List> applyPoliciesFutureList = new ArrayList<>(); + Boolean encryptionRequired = topicPolicies.getEncryptionRequired().get(); + isEncryptionRequired = encryptionRequired != null ? encryptionRequired : false; + Boolean allowAutoUpdateSchema = topicPolicies.getAllowAutoUpdateSchema().get(); + isAllowAutoUpdateSchema = allowAutoUpdateSchema != null ? allowAutoUpdateSchema : false; + // Client permission check. subscriptions.forEach((subName, sub) -> { sub.getConsumers().forEach(consumer -> applyPoliciesFutureList.add(consumer.checkPermissionsAsync())); @@ -3184,6 +3160,7 @@ private List> applyUpdatedTopicPolicies() { applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateDispatchRateLimiter())); applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscribeRateLimiter())); applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updatePublishRateLimiter())); + applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateResourceGroupLimiter())); applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscriptionsDispatcherRateLimiter())); applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread( @@ -4025,16 +4002,19 @@ private void updateSubscriptionsDispatcherRateLimiter() { }); } - protected CompletableFuture initTopicPolicy() { + protected CompletableFuture initTopicPolicyAndApply() { if (brokerService.pulsar().getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) { brokerService.getPulsar().getTopicPoliciesService() .registerListener(TopicName.getPartitionedTopicName(topic), this); - return CompletableFuture.completedFuture(null).thenRunAsync(() -> onUpdate( - brokerService.getPulsar().getTopicPoliciesService() - .getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))), - brokerService.getTopicOrderedExecutor()); + TopicPolicies topicPolicies = brokerService.getPulsar().getTopicPoliciesService() + .getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic)); + if (topicPolicies != null) { + onUpdate(topicPolicies); + return CompletableFuture.completedFuture(null); + } } - return CompletableFuture.completedFuture(null); + + return FutureUtil.waitForAll(applyUpdatedTopicPolicies()); } @VisibleForTesting diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java index 747ef3b7f5ce85..4447234bf40aaa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java @@ -24,16 +24,18 @@ import static org.testng.AssertJUnit.assertFalse; import com.google.common.collect.Sets; import java.lang.reflect.Method; +import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; +import org.apache.pulsar.broker.qos.AsyncTokenBucket; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.policies.data.DispatchRate; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +82,47 @@ public Object[][] dispatchRateProvider() { return new Object[][] { { DispatchRateType.messageRate }, { DispatchRateType.byteRate } }; } + @Test + public void testReLoadReplicator() throws Exception { + final String namespace = "pulsar/" + System.currentTimeMillis(); + final String topicName = "persistent://" + namespace + "/" + System.currentTimeMillis(); + + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + admin1.namespaces().setReplicatorDispatchRate(namespace, + DispatchRate.builder().relativeToPublishRate(true) + .dispatchThrottlingRateInMsg(10) + .dispatchThrottlingRateInByte(1000) + .build()); + + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).build(); + + // Activate the replicator. + client1.newProducer().topic(topicName).create().close(); + Optional topicReference = pulsar1.getBrokerService().getTopicReference(topicName); + assertTrue(topicReference.isPresent()); + PersistentTopic persistentTopic = (PersistentTopic) topicReference.get(); + List replicators = persistentTopic.getReplicators().values(); + assertFalse(replicators.isEmpty()); + Replicator replicator = replicators.get(0); + assertTrue(replicator.getRateLimiter().isPresent()); + + // Reload the replicator. + // 1. Clean the topic from the broker cache + // 2. Create the producer, and then the replicator will be reloaded. + pulsar1.getBrokerService().removeTopicFromCache(topicReference.get()); + client1.newProducer().topic(topicName).create().close(); + topicReference = pulsar1.getBrokerService().getTopicReference(topicName); + assertTrue(topicReference.isPresent()); + + persistentTopic = (PersistentTopic) topicReference.get(); + replicators = persistentTopic.getReplicators().values(); + assertFalse(replicators.isEmpty()); + replicator = replicators.get(0); + assertTrue(replicator.getRateLimiter().isPresent()); + } + @Test public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception { cleanup(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java index 4edb033498bc00..8c1a03b862b69e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java @@ -61,6 +61,9 @@ public class HierarchyTopicPolicies { final PolicyHierarchyValue schemaValidationEnforced; final PolicyHierarchyValue entryFilters; + final PolicyHierarchyValue resourceGroup; + final PolicyHierarchyValue encryptionRequired; + final PolicyHierarchyValue allowAutoUpdateSchema; public HierarchyTopicPolicies() { replicationClusters = new PolicyHierarchyValue<>(); @@ -94,5 +97,8 @@ public HierarchyTopicPolicies() { dispatchRate = new PolicyHierarchyValue<>(); schemaValidationEnforced = new PolicyHierarchyValue<>(); entryFilters = new PolicyHierarchyValue<>(); + resourceGroup = new PolicyHierarchyValue<>(); + encryptionRequired = new PolicyHierarchyValue<>(); + allowAutoUpdateSchema = new PolicyHierarchyValue<>(); } }