Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix the applying of namespace policies #22504

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {

topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().updateNamespaceValue(
namespacePolicies.dispatcherPauseOnAckStatePersistentEnabled);
topicPolicies.getResourceGroup().updateNamespaceValue(namespacePolicies.resource_group_name);
topicPolicies.getEncryptionRequired().updateNamespaceValue(namespacePolicies.encryption_required);
topicPolicies.getAllowAutoUpdateSchema().updateNamespaceValue(namespacePolicies.is_allow_auto_update_schema);

updateEntryFilters();
}
Expand Down Expand Up @@ -1107,7 +1110,7 @@ public PublishRateLimiter getBrokerPublishRateLimiter() {
/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and we can use
* #{@link AbstractTopic#updateResourceGroupLimiter(Policies)} to instead of it.
* #{@link AbstractTopic#updateResourceGroupLimiter()} to instead of it.
*/
@Deprecated
public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
Expand All @@ -1123,13 +1126,24 @@ public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
policies = new Policies();
}
updateResourceGroupLimiter(policies);
updateResourceGroupLimiter(policies.resource_group_name);
}

/**
* @deprecated Use {@link #updateResourceGroupLimiter()} instead.
*/
@Deprecated
public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) {
requireNonNull(namespacePolicies);
updateResourceGroupLimiter(namespacePolicies.resource_group_name);
}

protected void updateResourceGroupLimiter() {
updateResourceGroupLimiter(this.topicPolicies.getResourceGroup().get());
}

private void updateResourceGroupLimiter(String rgName) {
// attach the resource-group level rate limiters, if set
String rgName = namespacePolicies.resource_group_name;
if (rgName != null) {
final ResourceGroup resourceGroup =
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,55 +421,34 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS

@Override
public CompletableFuture<Void> initialize() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
futures.add(brokerService.getPulsar().newTopicCompactionService(topic).thenAccept(service -> {
PersistentTopic.this.topicCompactionService = service;
this.createPersistentSubscriptions();
}));

for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
futures.add(addReplicationCluster(remoteCluster, cursor, localCluster));
}
}
return FutureUtil.waitForAll(futures).thenCompose(__ ->
brokerService.pulsar().getPulsarResources().getNamespaceResources()
return brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenAcceptAsync(optPolicies -> {
if (!optPolicies.isPresent()) {
isEncryptionRequired = false;
updatePublishRateLimiter();
updateResourceGroupLimiter(new Policies());
initializeDispatchRateLimiterIfNeeded();
updateSubscribeRateLimiter();
return;
}

Policies policies = optPolicies.get();

this.updateTopicPolicyByNamespacePolicy(policies);

initializeDispatchRateLimiterIfNeeded();

updateSubscribeRateLimiter();

updatePublishRateLimiter();

updateResourceGroupLimiter(policies);

this.isEncryptionRequired = policies.encryption_required;

isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
}, getOrderedExecutor())
.thenCompose(ignore -> initTopicPolicy())
.thenAcceptAsync(optPolicies -> optPolicies.ifPresent(this::updateTopicPolicyByNamespacePolicy),
getOrderedExecutor())
.thenCompose(ignore -> initTopicPolicyAndApply())
.exceptionally(ex -> {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false",
topic, ex.getMessage());
isEncryptionRequired = false;
return null;
}));
})
.thenCompose(ignore -> {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
futures.add(addReplicationCluster(remoteCluster, cursor, localCluster));
}
}
return FutureUtil.waitForAll(futures);
})
.thenCompose(ignore -> brokerService.getPulsar().newTopicCompactionService(topic)
.thenAccept(service -> {
PersistentTopic.this.topicCompactionService = service;
this.createPersistentSubscriptions();
})
);
}

// for testing purposes
Expand Down Expand Up @@ -3316,27 +3295,23 @@ public CompletableFuture<Void> onPoliciesUpdate(@Nonnull Policies data) {
// see more detail: https://github.com/apache/pulsar/pull/19364.
updateTopicPolicyByNamespacePolicy(data);
checkReplicatedSubscriptionControllerState();
isEncryptionRequired = data.encryption_required;
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;

// Apply policies for components.
List<CompletableFuture<Void>> applyPolicyTasks = applyUpdatedTopicPolicies();
applyPolicyTasks.add(applyUpdatedNamespacePolicies(data));
return FutureUtil.waitForAll(applyPolicyTasks)
return FutureUtil.waitForAll(applyUpdatedTopicPolicies())
.thenAccept(__ -> log.info("[{}] namespace-level policies updated successfully", topic))
.exceptionally(ex -> {
log.error("[{}] update namespace polices : {} error", this.getName(), data, ex);
throw FutureUtil.wrapToCompletionException(ex);
});
}

private CompletableFuture<Void> applyUpdatedNamespacePolicies(Policies namespaceLevelPolicies) {
return FutureUtil.runWithCurrentThread(() -> updateResourceGroupLimiter(namespaceLevelPolicies));
}

private List<CompletableFuture<Void>> applyUpdatedTopicPolicies() {
List<CompletableFuture<Void>> applyPoliciesFutureList = new ArrayList<>();

Boolean encryptionRequired = topicPolicies.getEncryptionRequired().get();
isEncryptionRequired = encryptionRequired != null ? encryptionRequired : false;
isAllowAutoUpdateSchema = topicPolicies.getAllowAutoUpdateSchema().get();

// Client permission check.
subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(consumer -> applyPoliciesFutureList.add(consumer.checkPermissionsAsync()));
Expand All @@ -3350,6 +3325,7 @@ private List<CompletableFuture<Void>> applyUpdatedTopicPolicies() {
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateDispatchRateLimiter()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscribeRateLimiter()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updatePublishRateLimiter()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateResourceGroupLimiter()));

applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscriptionsDispatcherRateLimiter()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(
Expand Down Expand Up @@ -4225,16 +4201,19 @@ private void updateSubscriptionsDispatcherRateLimiter() {
});
}

protected CompletableFuture<Void> initTopicPolicy() {
protected CompletableFuture<Void> initTopicPolicyAndApply() {
Copy link
Member Author

@nodece nodece Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call thread is getOrderedExecutor().

if (brokerService.pulsar().getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) {
brokerService.getPulsar().getTopicPoliciesService()
.registerListener(TopicName.getPartitionedTopicName(topic), this);
return CompletableFuture.completedFuture(null).thenRunAsync(() -> onUpdate(
brokerService.getPulsar().getTopicPoliciesService()
.getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))),
brokerService.getTopicOrderedExecutor());
TopicPolicies topicPolicies = brokerService.getPulsar().getTopicPoliciesService()
.getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic));
if (topicPolicies != null) {
onUpdate(topicPolicies);
return CompletableFuture.completedFuture(null);
}
}
return CompletableFuture.completedFuture(null);

return FutureUtil.waitForAll(applyUpdatedTopicPolicies());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1742,6 +1742,7 @@ public CompletableFuture<org.apache.pulsar.client.api.Producer> createAsync() {
brokerService.pulsar().getPulsarResources().getClusterResources()
.getCluster(remoteCluster))));
replicatorMap.put(remoteReplicatorName, replicator);
String cursorName = PersistentReplicator.getReplicatorName(topic.getReplicatorPrefix(), remoteReplicatorName);

// step-1 remove replicator : it will disconnect the producer but it will wait for callback to be completed
Method removeMethod = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class);
Expand All @@ -1757,7 +1758,7 @@ public CompletableFuture<org.apache.pulsar.client.api.Producer> createAsync() {

// step-3 : complete the callback to remove replicator from the list
ArgumentCaptor<DeleteCursorCallback> captor = ArgumentCaptor.forClass(DeleteCursorCallback.class);
Mockito.verify(ledgerMock).asyncDeleteCursor(any(), captor.capture(), any());
Mockito.verify(ledgerMock).asyncDeleteCursor(eq(cursorName), captor.capture(), any());
DeleteCursorCallback callback = captor.getValue();
callback.deleteCursorComplete(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,18 @@
import static org.testng.AssertJUnit.assertFalse;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -80,6 +82,47 @@ public Object[][] dispatchRateProvider() {
return new Object[][] { { DispatchRateType.messageRate }, { DispatchRateType.byteRate } };
}

@Test
public void testReLoadReplicator() throws Exception {
final String namespace = "pulsar/" + System.currentTimeMillis();
final String topicName = "persistent://" + namespace + "/" + System.currentTimeMillis();

admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
admin1.namespaces().setReplicatorDispatchRate(namespace,
DispatchRate.builder().relativeToPublishRate(true)
.dispatchThrottlingRateInMsg(10)
.dispatchThrottlingRateInByte(1000)
.build());

@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).build();

// Activate the replicator.
client1.newProducer().topic(topicName).create().close();
Optional<Topic> topicReference = pulsar1.getBrokerService().getTopicReference(topicName);
assertTrue(topicReference.isPresent());
PersistentTopic persistentTopic = (PersistentTopic) topicReference.get();
List<Replicator> replicators = persistentTopic.getReplicators().values();
assertFalse(replicators.isEmpty());
Replicator replicator = replicators.get(0);
assertTrue(replicator.getRateLimiter().isPresent());

// Reload the replicator.
// 1. Clean the topic from the broker cache
// 2. Create the producer, and then the replicator will be reloaded.
pulsar1.getBrokerService().removeTopicFromCache(topicReference.get());
client1.newProducer().topic(topicName).create().close();
topicReference = pulsar1.getBrokerService().getTopicReference(topicName);
assertTrue(topicReference.isPresent());

persistentTopic = (PersistentTopic) topicReference.get();
replicators = persistentTopic.getReplicators().values();
assertFalse(replicators.isEmpty());
replicator = replicators.get(0);
assertTrue(replicator.getRateLimiter().isPresent());
}

@Test
public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception {
cleanup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public class HierarchyTopicPolicies {

final PolicyHierarchyValue<Boolean> schemaValidationEnforced;
final PolicyHierarchyValue<EntryFilters> entryFilters;
final PolicyHierarchyValue<String> resourceGroup;
final PolicyHierarchyValue<Boolean> encryptionRequired;
final PolicyHierarchyValue<Boolean> allowAutoUpdateSchema;

public HierarchyTopicPolicies() {
replicationClusters = new PolicyHierarchyValue<>();
Expand Down Expand Up @@ -94,5 +97,8 @@ public HierarchyTopicPolicies() {
dispatchRate = new PolicyHierarchyValue<>();
schemaValidationEnforced = new PolicyHierarchyValue<>();
entryFilters = new PolicyHierarchyValue<>();
resourceGroup = new PolicyHierarchyValue<>();
encryptionRequired = new PolicyHierarchyValue<>();
allowAutoUpdateSchema = new PolicyHierarchyValue<>();
}
}
Loading