Skip to content

Commit

Permalink
[fix][broker] Fix the applying of namespace policies
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Apr 15, 2024
1 parent 5d18ff7 commit 131af38
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {

topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().updateNamespaceValue(
namespacePolicies.dispatcherPauseOnAckStatePersistentEnabled);
topicPolicies.getResourceGroup().updateNamespaceValue(namespacePolicies.resource_group_name);
topicPolicies.getEncryptionRequired().updateNamespaceValue(namespacePolicies.encryption_required);
topicPolicies.getAllowAutoUpdateSchema().updateNamespaceValue(namespacePolicies.is_allow_auto_update_schema);

updateEntryFilters();
}
Expand Down Expand Up @@ -1092,7 +1095,7 @@ public PublishRateLimiter getBrokerPublishRateLimiter() {
/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and we can use
* #{@link AbstractTopic#updateResourceGroupLimiter(Policies)} to instead of it.
* #{@link AbstractTopic#updateResourceGroupLimiter()} to instead of it.
*/
@Deprecated
public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
Expand All @@ -1108,13 +1111,24 @@ public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
policies = new Policies();
}
updateResourceGroupLimiter(policies);
updateResourceGroupLimiter(policies.resource_group_name);
}

/**
* @deprecated Use {@link #updateResourceGroupLimiter()} instead.
*/
@Deprecated
public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) {
requireNonNull(namespacePolicies);
updateResourceGroupLimiter(namespacePolicies.resource_group_name);
}

protected void updateResourceGroupLimiter() {
updateResourceGroupLimiter(this.topicPolicies.getResourceGroup().get());
}

private void updateResourceGroupLimiter(String rgName) {
// attach the resource-group level rate limiters, if set
String rgName = namespacePolicies.resource_group_name;
if (rgName != null) {
final ResourceGroup resourceGroup =
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,55 +367,34 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS

@Override
public CompletableFuture<Void> initialize() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
futures.add(brokerService.getPulsar().newTopicCompactionService(topic).thenAccept(service -> {
PersistentTopic.this.topicCompactionService = service;
this.createPersistentSubscriptions();
}));

for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
futures.add(addReplicationCluster(remoteCluster, cursor, localCluster));
}
}
return FutureUtil.waitForAll(futures).thenCompose(__ ->
brokerService.pulsar().getPulsarResources().getNamespaceResources()
return brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenAcceptAsync(optPolicies -> {
if (!optPolicies.isPresent()) {
isEncryptionRequired = false;
updatePublishRateLimiter();
updateResourceGroupLimiter(new Policies());
initializeDispatchRateLimiterIfNeeded();
updateSubscribeRateLimiter();
return;
}

Policies policies = optPolicies.get();

this.updateTopicPolicyByNamespacePolicy(policies);

initializeDispatchRateLimiterIfNeeded();

updateSubscribeRateLimiter();

updatePublishRateLimiter();

updateResourceGroupLimiter(policies);

this.isEncryptionRequired = policies.encryption_required;

isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
}, getOrderedExecutor())
.thenCompose(ignore -> initTopicPolicy())
.thenAcceptAsync(optPolicies -> optPolicies.ifPresent(this::updateTopicPolicyByNamespacePolicy),
getOrderedExecutor())
.thenCompose(ignore -> initTopicPolicyAndApply())
.exceptionally(ex -> {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false",
topic, ex.getMessage());
isEncryptionRequired = false;
return null;
}));
})
.thenCompose(ignore -> {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
futures.add(addReplicationCluster(remoteCluster, cursor, localCluster));
}
}
return FutureUtil.waitForAll(futures);
})
.thenCompose(ignore -> brokerService.getPulsar().newTopicCompactionService(topic)
.thenAccept(service -> {
PersistentTopic.this.topicCompactionService = service;
this.createPersistentSubscriptions();
})
);
}

// for testing purposes
Expand Down Expand Up @@ -3150,27 +3129,24 @@ public CompletableFuture<Void> onPoliciesUpdate(@Nonnull Policies data) {
// see more detail: https://github.com/apache/pulsar/pull/19364.
updateTopicPolicyByNamespacePolicy(data);
checkReplicatedSubscriptionControllerState();
isEncryptionRequired = data.encryption_required;
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;

// Apply policies for components.
List<CompletableFuture<Void>> applyPolicyTasks = applyUpdatedTopicPolicies();
applyPolicyTasks.add(applyUpdatedNamespacePolicies(data));
return FutureUtil.waitForAll(applyPolicyTasks)
return FutureUtil.waitForAll(applyUpdatedTopicPolicies())
.thenAccept(__ -> log.info("[{}] namespace-level policies updated successfully", topic))
.exceptionally(ex -> {
log.error("[{}] update namespace polices : {} error", this.getName(), data, ex);
throw FutureUtil.wrapToCompletionException(ex);
});
}

private CompletableFuture<Void> applyUpdatedNamespacePolicies(Policies namespaceLevelPolicies) {
return FutureUtil.runWithCurrentThread(() -> updateResourceGroupLimiter(namespaceLevelPolicies));
}

private List<CompletableFuture<Void>> applyUpdatedTopicPolicies() {
List<CompletableFuture<Void>> applyPoliciesFutureList = new ArrayList<>();

Boolean encryptionRequired = topicPolicies.getEncryptionRequired().get();
isEncryptionRequired = encryptionRequired != null ? encryptionRequired : false;
Boolean allowAutoUpdateSchema = topicPolicies.getAllowAutoUpdateSchema().get();
isAllowAutoUpdateSchema = allowAutoUpdateSchema != null ? allowAutoUpdateSchema : false;

// Client permission check.
subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(consumer -> applyPoliciesFutureList.add(consumer.checkPermissionsAsync()));
Expand All @@ -3184,6 +3160,7 @@ private List<CompletableFuture<Void>> applyUpdatedTopicPolicies() {
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateDispatchRateLimiter()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscribeRateLimiter()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updatePublishRateLimiter()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateResourceGroupLimiter()));

applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscriptionsDispatcherRateLimiter()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(
Expand Down Expand Up @@ -4025,16 +4002,19 @@ private void updateSubscriptionsDispatcherRateLimiter() {
});
}

protected CompletableFuture<Void> initTopicPolicy() {
protected CompletableFuture<Void> initTopicPolicyAndApply() {
if (brokerService.pulsar().getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) {
brokerService.getPulsar().getTopicPoliciesService()
.registerListener(TopicName.getPartitionedTopicName(topic), this);
return CompletableFuture.completedFuture(null).thenRunAsync(() -> onUpdate(
brokerService.getPulsar().getTopicPoliciesService()
.getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))),
brokerService.getTopicOrderedExecutor());
TopicPolicies topicPolicies = brokerService.getPulsar().getTopicPoliciesService()
.getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic));
if (topicPolicies != null) {
onUpdate(topicPolicies);
return CompletableFuture.completedFuture(null);
}
}
return CompletableFuture.completedFuture(null);

return FutureUtil.waitForAll(applyUpdatedTopicPolicies());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,18 @@
import static org.testng.AssertJUnit.assertFalse;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -80,6 +82,47 @@ public Object[][] dispatchRateProvider() {
return new Object[][] { { DispatchRateType.messageRate }, { DispatchRateType.byteRate } };
}

@Test
public void testReLoadReplicator() throws Exception {
final String namespace = "pulsar/" + System.currentTimeMillis();
final String topicName = "persistent://" + namespace + "/" + System.currentTimeMillis();

admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
admin1.namespaces().setReplicatorDispatchRate(namespace,
DispatchRate.builder().relativeToPublishRate(true)
.dispatchThrottlingRateInMsg(10)
.dispatchThrottlingRateInByte(1000)
.build());

@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).build();

// Activate the replicator.
client1.newProducer().topic(topicName).create().close();
Optional<Topic> topicReference = pulsar1.getBrokerService().getTopicReference(topicName);
assertTrue(topicReference.isPresent());
PersistentTopic persistentTopic = (PersistentTopic) topicReference.get();
List<Replicator> replicators = persistentTopic.getReplicators().values();
assertFalse(replicators.isEmpty());
Replicator replicator = replicators.get(0);
assertTrue(replicator.getRateLimiter().isPresent());

// Reload the replicator.
// 1. Clean the topic from the broker cache
// 2. Create the producer, and then the replicator will be reloaded.
pulsar1.getBrokerService().removeTopicFromCache(topicReference.get());
client1.newProducer().topic(topicName).create().close();
topicReference = pulsar1.getBrokerService().getTopicReference(topicName);
assertTrue(topicReference.isPresent());

persistentTopic = (PersistentTopic) topicReference.get();
replicators = persistentTopic.getReplicators().values();
assertFalse(replicators.isEmpty());
replicator = replicators.get(0);
assertTrue(replicator.getRateLimiter().isPresent());
}

@Test
public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception {
cleanup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public class HierarchyTopicPolicies {

final PolicyHierarchyValue<Boolean> schemaValidationEnforced;
final PolicyHierarchyValue<EntryFilters> entryFilters;
final PolicyHierarchyValue<String> resourceGroup;
final PolicyHierarchyValue<Boolean> encryptionRequired;
final PolicyHierarchyValue<Boolean> allowAutoUpdateSchema;

public HierarchyTopicPolicies() {
replicationClusters = new PolicyHierarchyValue<>();
Expand Down Expand Up @@ -94,5 +97,8 @@ public HierarchyTopicPolicies() {
dispatchRate = new PolicyHierarchyValue<>();
schemaValidationEnforced = new PolicyHierarchyValue<>();
entryFilters = new PolicyHierarchyValue<>();
resourceGroup = new PolicyHierarchyValue<>();
encryptionRequired = new PolicyHierarchyValue<>();
allowAutoUpdateSchema = new PolicyHierarchyValue<>();
}
}

0 comments on commit 131af38

Please sign in to comment.