Skip to content

Commit

Permalink
[fix] [broker] Messages lost on the remote cluster when using topic l…
Browse files Browse the repository at this point in the history
…evel replication (#22890)
  • Loading branch information
poorbarcode authored Jun 19, 2024
1 parent bacb162 commit feae589
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,13 +439,6 @@ public CompletableFuture<Void> initialize() {
this.createPersistentSubscriptions();
}));

for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
futures.add(addReplicationCluster(remoteCluster, cursor, localCluster));
}
}
return FutureUtil.waitForAll(futures).thenCompose(__ ->
brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
Expand Down Expand Up @@ -476,6 +469,7 @@ public CompletableFuture<Void> initialize() {
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
}, getOrderedExecutor())
.thenCompose(ignore -> initTopicPolicy())
.thenCompose(ignore -> removeOrphanReplicationCursors())
.exceptionally(ex -> {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false",
topic, ex.getMessage());
Expand Down Expand Up @@ -553,6 +547,21 @@ private void createPersistentSubscriptions() {
checkReplicatedSubscriptionControllerState();
}

private CompletableFuture<Void> removeOrphanReplicationCursors() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
List<String> replicationClusters = topicPolicies.getReplicationClusters().get();
for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
if (!replicationClusters.contains(remoteCluster)) {
log.warn("Remove the orphan replicator because the cluster '{}' does not exist", remoteCluster);
futures.add(removeReplicator(remoteCluster));
}
}
}
return FutureUtil.waitForAll(futures);
}

/**
* Unload a subscriber.
* @throws SubscriptionNotFoundException If subscription not founded.
Expand Down Expand Up @@ -2055,30 +2064,18 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
return future;
}

private CompletableFuture<Boolean> checkReplicationCluster(String remoteCluster) {
return brokerService.getPulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenApply(optPolicies -> optPolicies.map(policies -> policies.replication_clusters)
.orElse(Collections.emptySet()).contains(remoteCluster)
|| topicPolicies.getReplicationClusters().get().contains(remoteCluster));
}

protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, ManagedCursor cursor,
String localCluster) {
return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
.thenCompose(__ -> checkReplicationCluster(remoteCluster))
.thenCompose(clusterExists -> {
if (!clusterExists) {
log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster);
return removeReplicator(remoteCluster).thenApply(__ -> null);
}
return brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(remoteCluster)
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData));
})
.thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(remoteCluster)
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData)))
.thenAccept(replicationClient -> {
if (replicationClient == null) {
log.error("[{}] Can not create replicator because the remote client can not be created."
+ " remote cluster: {}. State of transferring : {}",
topic, remoteCluster, transferring);
return;
}
lock.readLock().lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.SneakyThrows;
Expand All @@ -58,7 +59,10 @@
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
Expand All @@ -78,6 +82,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -809,4 +814,102 @@ public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Except
admin2.topics().deletePartitionedTopic(topicName, false);
});
}

private String getTheLatestMessage(String topic, PulsarClient client, PulsarAdmin admin) throws Exception {
String dummySubscription = "s_" + UUID.randomUUID().toString().replace("-", "");
admin.topics().createSubscription(topic, dummySubscription, MessageId.earliest);
Consumer<String> c = client.newConsumer(Schema.STRING).topic(topic).subscriptionName(dummySubscription)
.subscribe();
String lastMsgValue = null;
while (true) {
Message<String> msg = c.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
lastMsgValue = msg.getValue();
}
c.unsubscribe();
return lastMsgValue;
}

enum ReplicationLevel {
TOPIC_LEVEL,
NAMESPACE_LEVEL;
}

@DataProvider(name = "replicationLevels")
public Object[][] replicationLevels() {
return new Object[][]{
{ReplicationLevel.TOPIC_LEVEL},
{ReplicationLevel.NAMESPACE_LEVEL}
};
}

@Test(dataProvider = "replicationLevels")
public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationLevel) throws Exception {
final String topicName = ((Supplier<String>) () -> {
if (replicationLevel.equals(ReplicationLevel.TOPIC_LEVEL)) {
return BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
} else {
return BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_");
}
}).get();
admin1.topics().createNonPartitionedTopic(topicName);
admin2.topics().createNonPartitionedTopic(topicName);
admin2.topics().createSubscription(topicName, "s1", MessageId.earliest);
if (replicationLevel.equals(ReplicationLevel.TOPIC_LEVEL)) {
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));
} else {
pulsar1.getConfig().setTopicLevelPoliciesEnabled(false);
}
verifyReplicationWorks(topicName);

/**
* Verify:
* 1. Inject an error to make the replicator is not able to work.
* 2. Send one message, since the replicator does not work anymore, this message will not be replicated.
* 3. Unload topic, the replicator will be re-created.
* 4. Verify: the message can be replicated to the remote cluster.
*/
// Step 1: Inject an error to make the replicator is not able to work.
Replicator replicator = broker1.getTopic(topicName, false).join().get().getReplicators().get(cluster2);
replicator.terminate();

// Step 2: Send one message, since the replicator does not work anymore, this message will not be replicated.
String msg = UUID.randomUUID().toString();
Producer p1 = client1.newProducer(Schema.STRING).topic(topicName).create();
p1.send(msg);
p1.close();
// The result of "peek message" will be the messages generated, so it is not the same as the message just sent.
Thread.sleep(3000);
assertNotEquals(getTheLatestMessage(topicName, client2, admin2), msg);
assertEquals(admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog(), 1);

// Step 3: Unload topic, the replicator will be re-created.
admin1.topics().unload(topicName);

// Step 4. Verify: the message can be replicated to the remote cluster.
Awaitility.await().atMost(Duration.ofSeconds(300)).untilAsserted(() -> {
log.info("replication backlog: {}",
admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog());
assertEquals(admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog(), 0);
assertEquals(getTheLatestMessage(topicName, client2, admin2), msg);
});

// Cleanup.
if (replicationLevel.equals(ReplicationLevel.TOPIC_LEVEL)) {
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
Awaitility.await().untilAsserted(() -> {
assertEquals(broker1.getTopic(topicName, false).join().get().getReplicators().size(), 0);
});
admin1.topics().delete(topicName, false);
admin2.topics().delete(topicName, false);
} else {
pulsar1.getConfig().setTopicLevelPoliciesEnabled(true);
cleanupTopics(() -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,28 @@ protected PulsarClient initClient(ClientBuilder clientBuilder) throws Exception
}

protected void verifyReplicationWorks(String topic) throws Exception {
// Wait for replicator starting.
Awaitility.await().until(() -> {
try {
PersistentTopic persistentTopic = (PersistentTopic) pulsar1.getBrokerService()
.getTopic(topic, false).join().get();
if (persistentTopic.getReplicators().size() > 0) {
return true;
}
} catch (Exception ex) {}

try {
String partition0 = TopicName.get(topic).getPartition(0).toString();
PersistentTopic persistentTopic = (PersistentTopic) pulsar1.getBrokerService()
.getTopic(partition0, false).join().get();
if (persistentTopic.getReplicators().size() > 0) {
return true;
}
} catch (Exception ex) {}

return false;
});
// Verify: pub & sub.
final String subscription = "__subscribe_1";
final String msgValue = "__msg1";
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topic).create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,9 @@ public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() throws
public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Exception {
super.testExpandTopicPartitionsOnNamespaceLevelReplication();
}

@Test(enabled = false)
public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationLevel) throws Exception {
super.testReloadWithTopicLevelGeoReplication(replicationLevel);
}
}

0 comments on commit feae589

Please sign in to comment.