From 5cd5f358a2309ae45866eaa52f9061e82a267be8 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 11 Jun 2024 20:15:33 +0800 Subject: [PATCH 1/7] [fix] [broker] Messages lost on the remote cluster when using topic level replication --- .../service/persistent/PersistentTopic.java | 54 +++++++------- .../broker/service/OneWayReplicatorTest.java | 70 +++++++++++++++++++ 2 files changed, 98 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 711e1d93f742f..18018de67b20b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -439,13 +439,6 @@ public CompletableFuture initialize() { this.createPersistentSubscriptions(); })); - for (ManagedCursor cursor : ledger.getCursors()) { - if (cursor.getName().startsWith(replicatorPrefix)) { - String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); - String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName()); - futures.add(addReplicationCluster(remoteCluster, cursor, localCluster)); - } - } return FutureUtil.waitForAll(futures).thenCompose(__ -> brokerService.pulsar().getPulsarResources().getNamespaceResources() .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) @@ -476,6 +469,7 @@ public CompletableFuture initialize() { isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema; }, getOrderedExecutor()) .thenCompose(ignore -> initTopicPolicy()) + .thenCompose(ignore -> removeOrphanReplicationCursors()) .exceptionally(ex -> { log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, ex.getMessage()); @@ -553,6 +547,26 @@ private void createPersistentSubscriptions() { checkReplicatedSubscriptionControllerState(); } + private CompletableFuture removeOrphanReplicationCursors() { + List> futures = new ArrayList<>(); + String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); + List replicationClusters = topicPolicies.getReplicationClusters().get(); + for (ManagedCursor cursor : ledger.getCursors()) { + if (cursor.getName().startsWith(replicatorPrefix)) { + String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName()); + if (!replicationClusters.contains(remoteCluster)) { + log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster); + futures.add(removeReplicator(remoteCluster)); + continue; + } + if (localCluster.equals(remoteCluster)) { + log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster); + } + } + } + return FutureUtil.waitForAll(futures); + } + /** * Unload a subscriber. * @throws SubscriptionNotFoundException If subscription not founded. @@ -2055,30 +2069,18 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) { return future; } - private CompletableFuture checkReplicationCluster(String remoteCluster) { - return brokerService.getPulsar().getPulsarResources().getNamespaceResources() - .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) - .thenApply(optPolicies -> optPolicies.map(policies -> policies.replication_clusters) - .orElse(Collections.emptySet()).contains(remoteCluster) - || topicPolicies.getReplicationClusters().get().contains(remoteCluster)); - } - protected CompletableFuture addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) { return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService) - .thenCompose(__ -> checkReplicationCluster(remoteCluster)) - .thenCompose(clusterExists -> { - if (!clusterExists) { - log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster); - return removeReplicator(remoteCluster).thenApply(__ -> null); - } - return brokerService.pulsar().getPulsarResources().getClusterResources() - .getClusterAsync(remoteCluster) - .thenApply(clusterData -> - brokerService.getReplicationClient(remoteCluster, clusterData)); - }) + .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources() + .getClusterAsync(remoteCluster) + .thenApply(clusterData -> + brokerService.getReplicationClient(remoteCluster, clusterData))) .thenAccept(replicationClient -> { if (replicationClient == null) { + log.error("[{}] Can not create replicator because the remote client can not be created." + + " remote cluster: {}. State of transferring : {}", + topic, remoteCluster, transferring); return; } lock.readLock().lock(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 3dcd787a0cd5e..aa9da6512aa92 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -58,7 +58,10 @@ import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; @@ -809,4 +812,71 @@ public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Except admin2.topics().deletePartitionedTopic(topicName, false); }); } + + private String getTheLatestMessage(String topic, PulsarClient client, PulsarAdmin admin) throws Exception { + String dummySubscription = "s_" + UUID.randomUUID().toString().replace("-", ""); + admin.topics().createSubscription(topic, dummySubscription, MessageId.earliest); + Consumer c = client.newConsumer(Schema.STRING).topic(topic).subscriptionName(dummySubscription) + .subscribe(); + String lastMsgValue = null; + while (true) { + Message msg = c.receive(2, TimeUnit.SECONDS); + if (msg == null) { + break; + } + lastMsgValue = msg.getValue(); + } + c.unsubscribe(); + return lastMsgValue; + } + + @Test + public void testReloadWithTopicLevelGeoReplication() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicName); + admin2.topics().createNonPartitionedTopic(topicName); + admin2.topics().createSubscription(topicName, "s1", MessageId.earliest); + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + verifyReplicationWorks(topicName); + + /** + * Verify: + * 1. Inject an error to make the replicator is not able to work. + * 2. Send one message, since the replicator does not work anymore, this message will not be replicated. + * 3. Unload topic, the replicator will be re-created. + * 4. Verify: the message can be replicated to the remote cluster. + */ + // Step 1: Inject an error to make the replicator is not able to work. + Replicator replicator = broker1.getTopic(topicName, false).join().get().getReplicators().get(cluster2); + replicator.terminate(); + + // Step 2: Send one message, since the replicator does not work anymore, this message will not be replicated. + String msg = UUID.randomUUID().toString(); + Producer p1 = client1.newProducer(Schema.STRING).topic(topicName).create(); + p1.send(msg); + p1.close(); + // The result of "peek message" will be the messages generated, so it is not the same as the message just sent. + Thread.sleep(3000); + assertNotEquals(getTheLatestMessage(topicName, client2, admin2), msg); + assertEquals(admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog(), 1); + + // Step 3: Unload topic, the replicator will be re-created. + admin1.topics().unload(topicName); + + // Step 4. Verify: the message can be replicated to the remote cluster. + Awaitility.await().untilAsserted(() -> { + log.info("replication backlog: {}", + admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog()); + assertEquals(admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog(), 0); + assertEquals(getTheLatestMessage(topicName, client2, admin2), msg); + }); + + // Cleanup. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + Awaitility.await().untilAsserted(() -> { + assertEquals(broker1.getTopic(topicName, false).join().get().getReplicators().size(), 0); + }); + admin1.topics().delete(topicName, false); + admin2.topics().delete(topicName, false); + } } From 2b074d3c209dc5fe6fdaa2d010cccda623f1770a Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 11 Jun 2024 23:20:17 +0800 Subject: [PATCH 2/7] remove unnecessary codes --- .../pulsar/broker/service/persistent/PersistentTopic.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 18018de67b20b..f0f6c78ebb784 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -559,9 +559,6 @@ private CompletableFuture removeOrphanReplicationCursors() { futures.add(removeReplicator(remoteCluster)); continue; } - if (localCluster.equals(remoteCluster)) { - log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster); - } } } return FutureUtil.waitForAll(futures); From bf59a81bc79e6392e76bf60e0ac5ed5d4ce47db1 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 13 Jun 2024 16:46:24 +0800 Subject: [PATCH 3/7] remove unnecessary code --- .../pulsar/broker/service/persistent/PersistentTopic.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f0f6c78ebb784..ac3aa23484ddf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -549,7 +549,6 @@ private void createPersistentSubscriptions() { private CompletableFuture removeOrphanReplicationCursors() { List> futures = new ArrayList<>(); - String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); List replicationClusters = topicPolicies.getReplicationClusters().get(); for (ManagedCursor cursor : ledger.getCursors()) { if (cursor.getName().startsWith(replicatorPrefix)) { @@ -557,7 +556,6 @@ private CompletableFuture removeOrphanReplicationCursors() { if (!replicationClusters.contains(remoteCluster)) { log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster); futures.add(removeReplicator(remoteCluster)); - continue; } } } From f70ed55f3d74ab37129b7cf11f93dac7fabbecf0 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 14 Jun 2024 18:34:50 +0800 Subject: [PATCH 4/7] change the log --- .../pulsar/broker/service/persistent/PersistentTopic.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ac3aa23484ddf..630712f536874 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -554,7 +554,7 @@ private CompletableFuture removeOrphanReplicationCursors() { if (cursor.getName().startsWith(replicatorPrefix)) { String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName()); if (!replicationClusters.contains(remoteCluster)) { - log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster); + log.warn("Remove the orphan replicator because the cluster '{}' does not exist", remoteCluster); futures.add(removeReplicator(remoteCluster)); } } From dbf74a676c5b191df3f8661844c85f5c29d18de5 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 18 Jun 2024 10:55:39 +0800 Subject: [PATCH 5/7] fix test --- .../broker/service/OneWayReplicatorUsingGlobalZKTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index b4747a8bd0e47..900a2005cfdcd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -104,4 +104,9 @@ public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() throws public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Exception { super.testExpandTopicPartitionsOnNamespaceLevelReplication(); } + + @Test(enabled = false) + public void testReloadWithTopicLevelGeoReplication() throws Exception { + super.testReloadWithTopicLevelGeoReplication(); + } } From c72699a465b7e923c2738af147099504973f7c1f Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 18 Jun 2024 15:09:05 +0800 Subject: [PATCH 6/7] improve test --- .../broker/service/OneWayReplicatorTest.java | 55 +++++++++++++++---- .../service/OneWayReplicatorTestBase.java | 5 ++ .../OneWayReplicatorUsingGlobalZKTest.java | 4 +- 3 files changed, 51 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index aa9da6512aa92..c9b23c6437a22 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.Supplier; import lombok.AllArgsConstructor; import lombok.Data; import lombok.SneakyThrows; @@ -81,6 +82,7 @@ import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -830,13 +832,36 @@ private String getTheLatestMessage(String topic, PulsarClient client, PulsarAdmi return lastMsgValue; } - @Test - public void testReloadWithTopicLevelGeoReplication() throws Exception { - final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + enum ReplicationLevel { + TOPIC_LEVEL, + NAMESPACE_LEVEL; + } + + @DataProvider(name = "replicationLevels") + public Object[][] replicationLevels() { + return new Object[][]{ + {ReplicationLevel.TOPIC_LEVEL}, + {ReplicationLevel.NAMESPACE_LEVEL} + }; + } + + @Test(dataProvider = "replicationLevels") + public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationLevel) throws Exception { + final String topicName = ((Supplier) () -> { + if (replicationLevel.equals(ReplicationLevel.TOPIC_LEVEL)) { + return BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + } else { + return BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); + } + }).get(); admin1.topics().createNonPartitionedTopic(topicName); admin2.topics().createNonPartitionedTopic(topicName); admin2.topics().createSubscription(topicName, "s1", MessageId.earliest); - admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + if (replicationLevel.equals(ReplicationLevel.TOPIC_LEVEL)) { + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + } else { + pulsar1.getConfig().setTopicLevelPoliciesEnabled(false); + } verifyReplicationWorks(topicName); /** @@ -864,7 +889,7 @@ public void testReloadWithTopicLevelGeoReplication() throws Exception { admin1.topics().unload(topicName); // Step 4. Verify: the message can be replicated to the remote cluster. - Awaitility.await().untilAsserted(() -> { + Awaitility.await().atMost(Duration.ofSeconds(300)).untilAsserted(() -> { log.info("replication backlog: {}", admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog()); assertEquals(admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog(), 0); @@ -872,11 +897,19 @@ public void testReloadWithTopicLevelGeoReplication() throws Exception { }); // Cleanup. - admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); - Awaitility.await().untilAsserted(() -> { - assertEquals(broker1.getTopic(topicName, false).join().get().getReplicators().size(), 0); - }); - admin1.topics().delete(topicName, false); - admin2.topics().delete(topicName, false); + if (replicationLevel.equals(ReplicationLevel.TOPIC_LEVEL)) { + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + Awaitility.await().untilAsserted(() -> { + assertEquals(broker1.getTopic(topicName, false).join().get().getReplicators().size(), 0); + }); + admin1.topics().delete(topicName, false); + admin2.topics().delete(topicName, false); + } else { + pulsar1.getConfig().setTopicLevelPoliciesEnabled(true); + cleanupTopics(() -> { + admin1.topics().delete(topicName); + admin2.topics().delete(topicName); + }); + } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 7372b2e478475..140ec8b7fbc21 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -350,6 +350,11 @@ protected PulsarClient initClient(ClientBuilder clientBuilder) throws Exception } protected void verifyReplicationWorks(String topic) throws Exception { + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topic, false).join().get(); + assertTrue(persistentTopic.getReplicators().size() > 0); + }); final String subscription = "__subscribe_1"; final String msgValue = "__msg1"; Producer producer1 = client1.newProducer(Schema.STRING).topic(topic).create(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index 900a2005cfdcd..b8f8edce2477e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -106,7 +106,7 @@ public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Except } @Test(enabled = false) - public void testReloadWithTopicLevelGeoReplication() throws Exception { - super.testReloadWithTopicLevelGeoReplication(); + public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationLevel) throws Exception { + super.testReloadWithTopicLevelGeoReplication(replicationLevel); } } From 94d3c9160ef4eb5e88706627c3c43710c484b192 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 19 Jun 2024 11:25:34 +0800 Subject: [PATCH 7/7] fix test --- .../service/OneWayReplicatorTestBase.java | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 140ec8b7fbc21..ffe6147412e56 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -350,11 +350,28 @@ protected PulsarClient initClient(ClientBuilder clientBuilder) throws Exception } protected void verifyReplicationWorks(String topic) throws Exception { - Awaitility.await().untilAsserted(() -> { - PersistentTopic persistentTopic = - (PersistentTopic) pulsar1.getBrokerService().getTopic(topic, false).join().get(); - assertTrue(persistentTopic.getReplicators().size() > 0); + // Wait for replicator starting. + Awaitility.await().until(() -> { + try { + PersistentTopic persistentTopic = (PersistentTopic) pulsar1.getBrokerService() + .getTopic(topic, false).join().get(); + if (persistentTopic.getReplicators().size() > 0) { + return true; + } + } catch (Exception ex) {} + + try { + String partition0 = TopicName.get(topic).getPartition(0).toString(); + PersistentTopic persistentTopic = (PersistentTopic) pulsar1.getBrokerService() + .getTopic(partition0, false).join().get(); + if (persistentTopic.getReplicators().size() > 0) { + return true; + } + } catch (Exception ex) {} + + return false; }); + // Verify: pub & sub. final String subscription = "__subscribe_1"; final String msgValue = "__msg1"; Producer producer1 = client1.newProducer(Schema.STRING).topic(topic).create();