diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index d152842b31c52..88130c3c2010c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -199,7 +199,12 @@ public boolean setReplicated(boolean replicated) { if (this.cursor != null) { if (replicated) { - return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); + if (!config.isEnableReplicatedSubscriptions()) { + log.warn("[{}][{}] Failed set replicated subscription status to {}, please enable the " + + "configuration enableReplicatedSubscriptions", topicName, subName, replicated); + } else { + return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); + } } else { return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index 2816a973c92da..f816aa2dd244a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -41,6 +41,7 @@ import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -50,11 +51,13 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -621,6 +624,27 @@ public void testReplicatedSubscriptionRestApi2() throws Exception { String.format("numReceivedMessages2 (%d) should be less than %d", numReceivedMessages2, numMessages)); } + @Test(timeOut = 30000) + public void testReplicatedSubscriptionRestApi3() throws Exception { + final String namespace = BrokerTestUtil.newUniqueName("geo/replicatedsubscription"); + final String topicName = "persistent://" + namespace + "/topic-rest-api3"; + final String subName = "sub"; + admin4.tenants().createTenant("geo", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid4"), Sets.newHashSet(cluster1, cluster4))); + admin4.namespaces().createNamespace(namespace); + admin4.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster4)); + admin4.topics().createPartitionedTopic(topicName, 2); + + @Cleanup + final PulsarClient client4 = PulsarClient.builder().serviceUrl(url4.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + + Consumer consumer4 = client4.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + Assert.expectThrows(PulsarAdminException.class, () -> + admin4.topics().setReplicatedSubscriptionStatus(topicName, subName, true)); + consumer4.close(); + } + /** * Tests replicated subscriptions when replicator producer is closed */ diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 47c0f4e35e886..11d663ff9f4f4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -82,6 +82,13 @@ public abstract class ReplicatorTestBase extends TestRetrySupport { PulsarAdmin admin3; LocalBookkeeperEnsemble bkEnsemble3; + URL url4; + URL urlTls4; + ServiceConfiguration config4 = new ServiceConfiguration(); + PulsarService pulsar4; + PulsarAdmin admin4; + LocalBookkeeperEnsemble bkEnsemble4; + ZookeeperServerTest globalZkS; ExecutorService executor; @@ -111,6 +118,7 @@ public abstract class ReplicatorTestBase extends TestRetrySupport { protected final String cluster1 = "r1"; protected final String cluster2 = "r2"; protected final String cluster3 = "r3"; + protected final String cluster4 = "r4"; // Default frequency public int getBrokerServicePurgeInactiveFrequency() { @@ -178,6 +186,21 @@ protected void setup() throws Exception { urlTls3 = new URL(pulsar3.getWebServiceAddressTls()); admin3 = PulsarAdmin.builder().serviceHttpUrl(url3.toString()).build(); + // Start region 4 + + // Start zk & bks + bkEnsemble4 = new LocalBookkeeperEnsemble(3, 0, () -> 0); + bkEnsemble4.start(); + + setConfig4DefaultValue(); + pulsar4 = new PulsarService(config4); + pulsar4.start(); + + url4 = new URL(pulsar4.getWebServiceAddress()); + urlTls4 = new URL(pulsar4.getWebServiceAddressTls()); + admin4 = PulsarAdmin.builder().serviceHttpUrl(url4.toString()).build(); + + // Provision the global namespace admin1.clusters().createCluster(cluster1, ClusterData.builder() .serviceUrl(url1.toString()) @@ -230,6 +253,23 @@ protected void setup() throws Exception { .brokerClientTlsTrustStorePassword(keyStorePassword) .brokerClientTlsTrustStoreType(keyStoreType) .build()); + admin4.clusters().createCluster(cluster4, ClusterData.builder() + .serviceUrl(url4.toString()) + .serviceUrlTls(urlTls4.toString()) + .brokerServiceUrl(pulsar4.getBrokerServiceUrl()) + .brokerServiceUrlTls(pulsar4.getBrokerServiceUrlTls()) + .brokerClientTlsEnabled(true) + .brokerClientCertificateFilePath(clientCertFilePath) + .brokerClientKeyFilePath(clientKeyFilePath) + .brokerClientTrustCertsFilePath(caCertFilePath) + .brokerClientTlsEnabledWithKeyStore(tlsWithKeyStore) + .brokerClientTlsKeyStore(clientKeyStorePath) + .brokerClientTlsKeyStorePassword(keyStorePassword) + .brokerClientTlsKeyStoreType(keyStoreType) + .brokerClientTlsTrustStore(clientTrustStorePath) + .brokerClientTlsTrustStorePassword(keyStorePassword) + .brokerClientTlsTrustStoreType(keyStoreType) + .build()); admin1.tenants().createTenant("pulsar", new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3"))); @@ -257,7 +297,7 @@ protected void setup() throws Exception { } public void setConfig3DefaultValue() { - setConfigDefaults(config3, "r3", bkEnsemble3); + setConfigDefaults(config3, cluster3, bkEnsemble3); config3.setTlsEnabled(true); } @@ -269,6 +309,11 @@ public void setConfig2DefaultValue() { setConfigDefaults(config2, cluster2, bkEnsemble2); } + public void setConfig4DefaultValue() { + setConfigDefaults(config4, cluster4, bkEnsemble4); + config4.setEnableReplicatedSubscriptions(false); + } + private void setConfigDefaults(ServiceConfiguration config, String clusterName, LocalBookkeeperEnsemble bookkeeperEnsemble) { config.setClusterName(clusterName); @@ -316,6 +361,11 @@ public void resetConfig3() { setConfig3DefaultValue(); } + public void resetConfig4() { + config4 = new ServiceConfiguration(); + setConfig4DefaultValue(); + } + private int inSec(int time, TimeUnit unit) { return (int) TimeUnit.SECONDS.convert(time, unit); } @@ -332,7 +382,11 @@ protected void cleanup() throws Exception { admin1.close(); admin2.close(); admin3.close(); + admin4.close(); + if (pulsar4 != null) { + pulsar4.close(); + } if (pulsar3 != null) { pulsar3.close(); } @@ -346,11 +400,13 @@ protected void cleanup() throws Exception { bkEnsemble1.stop(); bkEnsemble2.stop(); bkEnsemble3.stop(); + bkEnsemble4.stop(); globalZkS.stop(); resetConfig1(); resetConfig2(); resetConfig3(); + resetConfig4(); } static class MessageProducer implements AutoCloseable {