From d1fc7323cbf61a6d2955486fc123fdde5253e72c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 4 Apr 2023 10:31:33 +0800 Subject: [PATCH] [fix][broker] Ignore and remove the replicator cursor when the remote cluster is absent (#19972) --- .../service/persistent/PersistentTopic.java | 30 +++++++-- .../persistent/PersistentTopicTest.java | 63 +++++++++++++++++++ 2 files changed, 87 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index fa08330ff3c35..18a662c4b7a38 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1696,14 +1696,32 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) { return future; } + private CompletableFuture checkReplicationCluster(String remoteCluster) { + return brokerService.getPulsar().getPulsarResources().getNamespaceResources() + .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) + .thenApply(optPolicies -> optPolicies.map(policies -> policies.replication_clusters) + .orElse(Collections.emptySet()).contains(remoteCluster) + || topicPolicies.getReplicationClusters().get().contains(remoteCluster)); + } + protected CompletableFuture addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) { return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService) - .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources() - .getClusterAsync(remoteCluster) - .thenApply(clusterData -> - brokerService.getReplicationClient(remoteCluster, clusterData))) + .thenCompose(__ -> checkReplicationCluster(remoteCluster)) + .thenCompose(clusterExists -> { + if (!clusterExists) { + log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster); + return removeReplicator(remoteCluster).thenApply(__ -> null); + } + return brokerService.pulsar().getPulsarResources().getClusterResources() + .getClusterAsync(remoteCluster) + .thenApply(clusterData -> + brokerService.getReplicationClient(remoteCluster, clusterData)); + }) .thenAccept(replicationClient -> { + if (replicationClient == null) { + return; + } Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> { try { return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster, @@ -1727,8 +1745,8 @@ CompletableFuture removeReplicator(String remoteCluster) { String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster); - replicators.get(remoteCluster).disconnect().thenRun(() -> { - + Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::disconnect) + .orElse(CompletableFuture.completedFuture(null)).thenRun(() -> { ledger.asyncDeleteCursor(name, new DeleteCursorCallback() { @Override public void deleteCursorComplete(Object ctx) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 80a79e0234de4..c63be7aad01cd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -40,15 +40,20 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import lombok.Cleanup; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; @@ -57,6 +62,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; @@ -66,7 +72,9 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TopicStats; import org.awaitility.Awaitility; import org.junit.Assert; @@ -525,4 +533,59 @@ public void testDeleteTopicFail() throws Exception { makeDeletedFailed.set(false); persistentTopic.delete().get(); } + + @DataProvider(name = "topicLevelPolicy") + public static Object[][] topicLevelPolicy() { + return new Object[][] { { true }, { false } }; + } + + @Test(dataProvider = "topicLevelPolicy") + public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) throws Exception { + final String namespace = "prop/ns-abc"; + final String topicName = "persistent://" + namespace + + "/testCreateTopicWithZombieReplicatorCursor" + topicLevelPolicy; + final String remoteCluster = "remote"; + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, conf.getReplicatorPrefix() + "." + remoteCluster, + MessageId.earliest, true); + + admin.clusters().createCluster(remoteCluster, ClusterData.builder() + .serviceUrl("http://localhost:11112") + .brokerServiceUrl("pulsar://localhost:11111") + .build()); + TenantInfo tenantInfo = admin.tenants().getTenantInfo("prop"); + tenantInfo.getAllowedClusters().add(remoteCluster); + admin.tenants().updateTenant("prop", tenantInfo); + + if (topicLevelPolicy) { + admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster)); + } else { + admin.namespaces().setNamespaceReplicationClustersAsync( + namespace, Collections.singleton(remoteCluster)).get(); + } + + final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false) + .get(3, TimeUnit.SECONDS).orElse(null); + assertNotNull(topic); + + final Supplier> getCursors = () -> { + final Set cursors = new HashSet<>(); + final Iterable iterable = topic.getManagedLedger().getCursors(); + iterable.forEach(c -> cursors.add(c.getName())); + return cursors; + }; + assertEquals(getCursors.get(), Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster)); + + if (topicLevelPolicy) { + admin.topics().setReplicationClusters(topicName, Collections.emptyList()); + } else { + admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.emptySet()).get(); + } + admin.clusters().deleteCluster(remoteCluster); + // Now the cluster and its related policy has been removed but the replicator cursor still exists + + topic.initialize().get(3, TimeUnit.SECONDS); + Awaitility.await().atMost(3, TimeUnit.SECONDS) + .until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext()); + } }