Skip to content

Commit

Permalink
[improve] [broker] PIP-356 Support Geo-Replication starts at earliest…
Browse files Browse the repository at this point in the history
… position (apache#22856)

(cherry picked from commit 5fc0eaf)
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
poorbarcode authored and nodece committed Dec 3, 2024
1 parent 8ed1455 commit a1e5016
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Max number of snapshot to be cached per subscription.")
private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "The position that replication task start at, it can be set to earliest or latest (default).")
private String replicationStartAt = "latest";

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1537,7 +1537,14 @@ CompletableFuture<Void> startReplicator(String remoteCluster) {
final CompletableFuture<Void> future = new CompletableFuture<>();

String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
ledger.asyncOpenCursor(name, new OpenCursorCallback() {
final InitialPosition initialPosition;
if (MessageId.earliest.toString()
.equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt())) {
initialPosition = InitialPosition.Earliest;
} else {
initialPosition = InitialPosition.Latest;
}
ledger.asyncOpenCursor(name, initialPosition, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
Expand All @@ -43,6 +46,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -287,4 +291,100 @@ public void testCreateRemoteAdminFailed() throws Exception {
admin1.topics().deletePartitionedTopic(topic);
admin1.tenants().updateTenant(defaultTenant, tenantInfo);
}

protected void enableReplication(String topic) throws Exception {
admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, cluster2));
}

protected void disableReplication(String topic) throws Exception {
admin1.topics().setReplicationClusters(topic, Arrays.asList(cluster1, cluster2));
}

@Test
public void testConfigReplicationStartAt() throws Exception {
// Initialize.
String ns1 = defaultTenant + "/ns_" + UUID.randomUUID().toString().replace("-", "");
String subscription1 = "s1";
admin1.namespaces().createNamespace(ns1);
if (!usingGlobalZK) {
admin2.namespaces().createNamespace(ns1);
}

RetentionPolicies retentionPolicies = new RetentionPolicies(60 * 24, 1024);
admin1.namespaces().setRetention(ns1, retentionPolicies);
admin2.namespaces().setRetention(ns1, retentionPolicies);

// 1. default config.
// Enable replication for topic1.
final String topic1 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
admin1.topics().createNonPartitionedTopicAsync(topic1);
admin1.topics().createSubscription(topic1, subscription1, MessageId.earliest);
Producer<String> p1 = client1.newProducer(Schema.STRING).topic(topic1).create();
p1.send("msg-1");
p1.close();
enableReplication(topic1);
// Verify: since the replication was started at latest, there is no message to consume.
Consumer<String> c1 = client2.newConsumer(Schema.STRING).topic(topic1).subscriptionName(subscription1)
.subscribe();
Message<String> msg1 = c1.receive(2, TimeUnit.SECONDS);
assertNull(msg1);
c1.close();
disableReplication(topic1);

// 2.Update config: start at "earliest".
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.earliest.toString());
Awaitility.await().untilAsserted(() -> {
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("earliest");
});

final String topic2 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
admin1.topics().createNonPartitionedTopicAsync(topic2);
admin1.topics().createSubscription(topic2, subscription1, MessageId.earliest);
Producer<String> p2 = client1.newProducer(Schema.STRING).topic(topic2).create();
p2.send("msg-1");
p2.close();
enableReplication(topic2);
// Verify: since the replication was started at earliest, there is one message to consume.
Consumer<String> c2 = client2.newConsumer(Schema.STRING).topic(topic2).subscriptionName(subscription1)
.subscribe();
Message<String> msg2 = c2.receive(2, TimeUnit.SECONDS);
assertNotNull(msg2);
assertEquals(msg2.getValue(), "msg-1");
c2.close();
disableReplication(topic2);

// 2.Update config: start at "latest".
admin1.brokers().updateDynamicConfiguration("replicationStartAt", MessageId.latest.toString());
Awaitility.await().untilAsserted(() -> {
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("latest");
});

final String topic3 = BrokerTestUtil.newUniqueName("persistent://" + ns1 + "/tp_");
admin1.topics().createNonPartitionedTopicAsync(topic3);
admin1.topics().createSubscription(topic3, subscription1, MessageId.earliest);
Producer<String> p3 = client1.newProducer(Schema.STRING).topic(topic3).create();
p3.send("msg-1");
p3.close();
enableReplication(topic3);
// Verify: since the replication was started at latest, there is no message to consume.
Consumer<String> c3 = client2.newConsumer(Schema.STRING).topic(topic3).subscriptionName(subscription1)
.subscribe();
Message<String> msg3 = c3.receive(2, TimeUnit.SECONDS);
assertNull(msg3);
c3.close();
disableReplication(topic3);

// cleanup.
// There is no good way to delete topics when using global ZK, skip cleanup.
admin1.namespaces().setNamespaceReplicationClusters(ns1, Collections.singleton(cluster1));
admin1.namespaces().unload(ns1);
admin2.namespaces().setNamespaceReplicationClusters(ns1, Collections.singleton(cluster2));
admin2.namespaces().unload(ns1);
admin1.topics().delete(topic1, false);
admin2.topics().delete(topic1, false);
admin1.topics().delete(topic2, false);
admin2.topics().delete(topic2, false);
admin1.topics().delete(topic3, false);
admin2.topics().delete(topic3, false);
}
}

0 comments on commit a1e5016

Please sign in to comment.