Skip to content

Commit

Permalink
[fix][broker] Apply namespace policy to topic after broker restart
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece committed Apr 15, 2024
1 parent 5d18ff7 commit 7bc6d44
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,21 +367,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS

@Override
public CompletableFuture<Void> initialize() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
futures.add(brokerService.getPulsar().newTopicCompactionService(topic).thenAccept(service -> {
PersistentTopic.this.topicCompactionService = service;
this.createPersistentSubscriptions();
}));

for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
futures.add(addReplicationCluster(remoteCluster, cursor, localCluster));
}
}
return FutureUtil.waitForAll(futures).thenCompose(__ ->
brokerService.pulsar().getPulsarResources().getNamespaceResources()
return brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenAcceptAsync(optPolicies -> {
if (!optPolicies.isPresent()) {
Expand Down Expand Up @@ -409,13 +395,30 @@ public CompletableFuture<Void> initialize() {

isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
}, getOrderedExecutor())
.thenCompose(ignore -> initTopicPolicy())
.thenCompose(ignore -> initTopicPolicyAndApply())
.exceptionally(ex -> {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false",
topic, ex.getMessage());
isEncryptionRequired = false;
return null;
}));
})
.thenCompose(ignore -> {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
futures.add(addReplicationCluster(remoteCluster, cursor, localCluster));
}
}
return FutureUtil.waitForAll(futures);
})
.thenCompose(ignore -> brokerService.getPulsar().newTopicCompactionService(topic)
.thenAccept(service -> {
PersistentTopic.this.topicCompactionService = service;
this.createPersistentSubscriptions();
})
);
}

// for testing purposes
Expand Down Expand Up @@ -4025,16 +4028,19 @@ private void updateSubscriptionsDispatcherRateLimiter() {
});
}

protected CompletableFuture<Void> initTopicPolicy() {
protected CompletableFuture<Void> initTopicPolicyAndApply() {
if (brokerService.pulsar().getConfig().isSystemTopicAndTopicLevelPoliciesEnabled()) {
brokerService.getPulsar().getTopicPoliciesService()
.registerListener(TopicName.getPartitionedTopicName(topic), this);
return CompletableFuture.completedFuture(null).thenRunAsync(() -> onUpdate(
brokerService.getPulsar().getTopicPoliciesService()
.getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))),
brokerService.getTopicOrderedExecutor());
TopicPolicies topicPolicies = brokerService.getPulsar().getTopicPoliciesService()
.getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic));
if (topicPolicies != null) {
onUpdate(topicPolicies);
return CompletableFuture.completedFuture(null);
}
}
return CompletableFuture.completedFuture(null);

return FutureUtil.waitForAll(applyUpdatedTopicPolicies());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,18 @@
import static org.testng.AssertJUnit.assertFalse;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -80,6 +82,47 @@ public Object[][] dispatchRateProvider() {
return new Object[][] { { DispatchRateType.messageRate }, { DispatchRateType.byteRate } };
}

@Test
public void testReLoadReplicator() throws Exception {
final String namespace = "pulsar/" + System.currentTimeMillis();
final String topicName = "persistent://" + namespace + "/" + System.currentTimeMillis();

admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
admin1.namespaces().setReplicatorDispatchRate(namespace,
DispatchRate.builder().relativeToPublishRate(true)
.dispatchThrottlingRateInMsg(10)
.dispatchThrottlingRateInByte(1000)
.build());

@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).build();

// Activate the replicator.
client1.newProducer().topic(topicName).create().close();
Optional<Topic> topicReference = pulsar1.getBrokerService().getTopicReference(topicName);
assertTrue(topicReference.isPresent());
PersistentTopic persistentTopic = (PersistentTopic) topicReference.get();
List<Replicator> replicators = persistentTopic.getReplicators().values();
assertFalse(replicators.isEmpty());
Replicator replicator = replicators.get(0);
assertTrue(replicator.getRateLimiter().isPresent());

// Reload the replicator.
// 1. Clean the topic from the broker cache
// 2. Create the producer, and then the replicator will be reloaded.
pulsar1.getBrokerService().removeTopicFromCache(topicReference.get());
client1.newProducer().topic(topicName).create().close();
topicReference = pulsar1.getBrokerService().getTopicReference(topicName);
assertTrue(topicReference.isPresent());

persistentTopic = (PersistentTopic) topicReference.get();
replicators = persistentTopic.getReplicators().values();
assertFalse(replicators.isEmpty());
replicator = replicators.get(0);
assertTrue(replicator.getRateLimiter().isPresent());
}

@Test
public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception {
cleanup();
Expand Down

0 comments on commit 7bc6d44

Please sign in to comment.