Skip to content

Commit

Permalink
[fix][broker] Ignore and remove the replicator cursor when the remote…
Browse files Browse the repository at this point in the history
… cluster is absent (#19972)
  • Loading branch information
BewareMyPower authored Apr 4, 2023
1 parent 5ef3a21 commit d1fc732
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1696,14 +1696,32 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
return future;
}

private CompletableFuture<Boolean> checkReplicationCluster(String remoteCluster) {
return brokerService.getPulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenApply(optPolicies -> optPolicies.map(policies -> policies.replication_clusters)
.orElse(Collections.emptySet()).contains(remoteCluster)
|| topicPolicies.getReplicationClusters().get().contains(remoteCluster));
}

protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, ManagedCursor cursor,
String localCluster) {
return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
.thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(remoteCluster)
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData)))
.thenCompose(__ -> checkReplicationCluster(remoteCluster))
.thenCompose(clusterExists -> {
if (!clusterExists) {
log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster);
return removeReplicator(remoteCluster).thenApply(__ -> null);
}
return brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(remoteCluster)
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData));
})
.thenAccept(replicationClient -> {
if (replicationClient == null) {
return;
}
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
Expand All @@ -1727,8 +1745,8 @@ CompletableFuture<Void> removeReplicator(String remoteCluster) {

String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);

replicators.get(remoteCluster).disconnect().thenRun(() -> {

Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::disconnect)
.orElse(CompletableFuture.completedFuture(null)).thenRun(() -> {
ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,20 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import lombok.Cleanup;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
Expand All @@ -57,6 +62,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -66,7 +72,9 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.junit.Assert;
Expand Down Expand Up @@ -525,4 +533,59 @@ public void testDeleteTopicFail() throws Exception {
makeDeletedFailed.set(false);
persistentTopic.delete().get();
}

@DataProvider(name = "topicLevelPolicy")
public static Object[][] topicLevelPolicy() {
return new Object[][] { { true }, { false } };
}

@Test(dataProvider = "topicLevelPolicy")
public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) throws Exception {
final String namespace = "prop/ns-abc";
final String topicName = "persistent://" + namespace
+ "/testCreateTopicWithZombieReplicatorCursor" + topicLevelPolicy;
final String remoteCluster = "remote";
admin.topics().createNonPartitionedTopic(topicName);
admin.topics().createSubscription(topicName, conf.getReplicatorPrefix() + "." + remoteCluster,
MessageId.earliest, true);

admin.clusters().createCluster(remoteCluster, ClusterData.builder()
.serviceUrl("http://localhost:11112")
.brokerServiceUrl("pulsar://localhost:11111")
.build());
TenantInfo tenantInfo = admin.tenants().getTenantInfo("prop");
tenantInfo.getAllowedClusters().add(remoteCluster);
admin.tenants().updateTenant("prop", tenantInfo);

if (topicLevelPolicy) {
admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster));
} else {
admin.namespaces().setNamespaceReplicationClustersAsync(
namespace, Collections.singleton(remoteCluster)).get();
}

final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false)
.get(3, TimeUnit.SECONDS).orElse(null);
assertNotNull(topic);

final Supplier<Set<String>> getCursors = () -> {
final Set<String> cursors = new HashSet<>();
final Iterable<ManagedCursor> iterable = topic.getManagedLedger().getCursors();
iterable.forEach(c -> cursors.add(c.getName()));
return cursors;
};
assertEquals(getCursors.get(), Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster));

if (topicLevelPolicy) {
admin.topics().setReplicationClusters(topicName, Collections.emptyList());
} else {
admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.emptySet()).get();
}
admin.clusters().deleteCluster(remoteCluster);
// Now the cluster and its related policy has been removed but the replicator cursor still exists

topic.initialize().get(3, TimeUnit.SECONDS);
Awaitility.await().atMost(3, TimeUnit.SECONDS)
.until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext());
}
}

0 comments on commit d1fc732

Please sign in to comment.