Skip to content

Commit

Permalink
[fix] [broker] Messages lost on the remote cluster when using topic l…
Browse files Browse the repository at this point in the history
…evel replication (apache#22890)

(cherry picked from commit feae589)
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
poorbarcode authored and nodece committed Sep 5, 2024
1 parent 3e63ef6 commit a630009
Show file tree
Hide file tree
Showing 6 changed files with 482 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractReplicator {
public abstract class AbstractReplicator implements Replicator {

protected final BrokerService brokerService;
protected final String topicName;
Expand All @@ -64,7 +64,7 @@ public abstract class AbstractReplicator {
private volatile State state = State.Stopped;

protected enum State {
Stopped, Starting, Started, Stopping
Stopped, Starting, Started, Stopping, Terminated
}

public AbstractReplicator(Topic localTopic, String replicatorPrefix, String localCluster, String remoteCluster,
Expand Down Expand Up @@ -109,6 +109,9 @@ public String getRemoteCluster() {
// This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer
// the end result can be disconnect.
public synchronized void startProducer() {
if (STATE_UPDATER.get(this) == State.Terminated) {
return;
}
if (STATE_UPDATER.get(this) == State.Stopping) {
long waitTimeMs = backOff.next();
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -182,14 +185,14 @@ protected CompletableFuture<Boolean> isLocalTopicActive() {
}, brokerService.executor());
}

protected synchronized CompletableFuture<Void> closeProducerAsync() {
protected synchronized CompletableFuture<Void> closeProducerAsync(Runnable onClosed) {
if (producer == null) {
STATE_UPDATER.set(this, State.Stopped);
onClosed.run();
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> future = producer.closeAsync();
return future.thenRun(() -> {
STATE_UPDATER.set(this, State.Stopped);
onClosed.run();
this.producer = null;
// deactivate further read
disableReplicatorRead();
Expand All @@ -200,17 +203,25 @@ protected synchronized CompletableFuture<Void> closeProducerAsync() {
+ " retrying again in {} s",
topicName, localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);
// BackOff before retrying
brokerService.executor().schedule(this::closeProducerAsync, waitTimeMs, TimeUnit.MILLISECONDS);
brokerService.executor().schedule(()->this.closeProducerAsync(onClosed), waitTimeMs, TimeUnit.MILLISECONDS);
return null;
});
}

protected synchronized CompletableFuture<Void> closeProducerAsync() {
return closeProducerAsync(() -> STATE_UPDATER.set(this, State.Stopped));
}

public CompletableFuture<Void> disconnect() {
return disconnect(false);
@Override
public CompletableFuture<Void> terminate() {
if (STATE_UPDATER.getAndSet(this, State.Terminated) == State.Terminated) {
return CompletableFuture.completedFuture(null);
}
return internalDisconnect(false, () -> {
});
}

public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog) {
protected CompletableFuture<Void> internalDisconnect(boolean failIfHasBacklog, Runnable onClosed) {
if (failIfHasBacklog && getNumberOfEntriesInBacklog() > 0) {
CompletableFuture<Void> disconnectFuture = new CompletableFuture<>();
disconnectFuture.completeExceptionally(new TopicBusyException("Cannot close a replicator with backlog"));
Expand All @@ -234,7 +245,15 @@ public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog)
remoteCluster, getReplicatorReadPosition(), getNumberOfEntriesInBacklog());
}

return closeProducerAsync();
return closeProducerAsync(onClosed);
}

public CompletableFuture<Void> disconnect() {
return disconnect(false);
}

public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog) {
return internalDisconnect(failIfHasBacklog, () -> STATE_UPDATER.set(this, State.Stopped));
}

public CompletableFuture<Void> remove() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public interface Replicator {

ReplicatorStatsImpl getStats();

CompletableFuture<Void> terminate();

CompletableFuture<Void> disconnect();

CompletableFuture<Void> disconnect(boolean b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,23 +333,13 @@ public CompletableFuture<Void> initialize() {

schemaValidationEnforced = policies.schema_validation_enforced;
}, brokerService.getTopicOrderedExecutor())
.thenCompose(ignore -> initTopicPolicyAndApply())
.thenCompose(ignore -> initTopicPolicy())
.thenCompose(ignore -> removeOrphanReplicationCursors())
.exceptionally(ex -> {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false",
topic, ex.getMessage());
isEncryptionRequired = false;
return null;
})
.thenCompose(ignore -> {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
futures.add(addReplicationCluster(remoteCluster, cursor, localCluster));
}
}
return FutureUtil.waitForAll(futures);
});
}

Expand Down Expand Up @@ -390,6 +380,21 @@ private void initializeDispatchRateLimiterIfNeeded() {
}
}

private CompletableFuture<Void> removeOrphanReplicationCursors() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
List<String> replicationClusters = topicPolicies.getReplicationClusters().get();
for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
if (!replicationClusters.contains(remoteCluster)) {
log.warn("Remove the orphan replicator because the cluster '{}' does not exist", remoteCluster);
futures.add(removeReplicator(remoteCluster));
}
}
}
return FutureUtil.waitForAll(futures);
}

private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
boolean replicated, Map<String, String> subscriptionProperties) {
checkNotNull(compactedTopic);
Expand Down Expand Up @@ -1555,30 +1560,18 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
return future;
}

private CompletableFuture<Boolean> checkReplicationCluster(String remoteCluster) {
return brokerService.getPulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenApply(optPolicies -> optPolicies.map(policies -> policies.replication_clusters)
.orElse(Collections.emptySet()).contains(remoteCluster)
|| topicPolicies.getReplicationClusters().get().contains(remoteCluster));
}

protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, ManagedCursor cursor,
String localCluster) {
return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
.thenCompose(__ -> checkReplicationCluster(remoteCluster))
.thenCompose(clusterExists -> {
if (!clusterExists) {
log.warn("Remove the replicator because the cluster '{}' does not exist", remoteCluster);
return removeReplicator(remoteCluster).thenApply(__ -> null);
}
return brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(remoteCluster)
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData));
})
.thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(remoteCluster)
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData)))
.thenAccept(replicationClient -> {
if (replicationClient == null) {
log.error("[{}] Can not create replicator because the remote client can not be created."
+ " remote cluster: {}.",
topic, remoteCluster);
return;
}
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
Expand Down Expand Up @@ -3170,20 +3163,17 @@ private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies>
}
}

protected CompletableFuture<Void> initTopicPolicyAndApply() {
protected CompletableFuture<Void> initTopicPolicy() {
if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
&& brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
brokerService.getPulsar().getTopicPoliciesService()
.registerListener(TopicName.getPartitionedTopicName(topic), this);
TopicPolicies topicPolicies = brokerService.getPulsar().getTopicPoliciesService()
.getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic));
if (topicPolicies != null) {
onUpdate(topicPolicies);
return CompletableFuture.completedFuture(null);
}
return CompletableFuture.completedFuture(null).thenRunAsync(() -> onUpdate(
brokerService.getPulsar().getTopicPoliciesService()
.getTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))),
brokerService.getTopicOrderedExecutor());
}

return FutureUtil.waitForAll(applyUpdatedTopicPolicies());
return CompletableFuture.completedFuture(null);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
Expand Down Expand Up @@ -146,5 +147,20 @@ protected long getNumberOfEntriesInBacklog() {
protected void disableReplicatorRead() {

}

@Override
public ReplicatorStatsImpl getStats() {
return null;
}

@Override
public void updateRates() {

}

@Override
public boolean isConnected() {
return false;
}
}
}
Loading

0 comments on commit a630009

Please sign in to comment.